samu's picture
backend with database
f9b55c1
raw
history blame
20.2 kB
import aiosqlite
import json
import os
from typing import Optional, List, Dict, Any
from datetime import datetime
import uuid
import logging
logger = logging.getLogger(__name__)
# Database file path
DB_PATH = os.getenv("DATABASE_PATH", "/app/ai_tutor.db")
class Database:
"""Pure SQLite database handler for AI Language Tutor"""
def __init__(self, db_path: str = DB_PATH):
self.db_path = db_path
async def initialize(self):
"""Initialize database with schema"""
async with aiosqlite.connect(self.db_path) as db:
# Read and execute schema - look for it in parent directory
schema_path = os.path.join(os.path.dirname(__file__), 'schema.sql')
with open(schema_path, 'r') as f:
schema = f.read()
await db.executescript(schema)
await db.commit()
logger.info("Database initialized successfully")
async def find_existing_curriculum(
self,
query: str,
native_language: str,
target_language: str,
proficiency: str,
user_id: Optional[int] = None
) -> Optional[Dict[str, Any]]:
"""Find existing curriculum for exact query and metadata match"""
logger.info(f"Looking for curriculum: query='{query[:50]}...', native={native_language}, target={target_language}, proficiency={proficiency}, user_id={user_id}")
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
# Always look for exact query matches first, prioritizing user-specific matches
if user_id is not None:
# User-specific search: Find exact query match for the user
logger.info(f"Searching for exact match for user {user_id}")
async with db.execute("""
SELECT c.*, m.native_language, m.target_language, m.proficiency, m.title, m.query
FROM curricula c
JOIN metadata_extractions m ON c.metadata_extraction_id = m.id
WHERE m.user_id = ? AND m.query = ? AND m.native_language = ?
AND m.target_language = ? AND m.proficiency = ?
ORDER BY c.created_at DESC
LIMIT 1
""", (user_id, query, native_language, target_language, proficiency)) as cursor:
row = await cursor.fetchone()
if row:
logger.info(f"Found exact user match: {dict(row)['id']}")
return dict(row)
# Look for exact query match from any user (only if the query is exactly the same)
logger.info("Searching for exact query match (any user)")
async with db.execute("""
SELECT c.*, m.native_language, m.target_language, m.proficiency, m.title, m.query
FROM curricula c
JOIN metadata_extractions m ON c.metadata_extraction_id = m.id
WHERE m.query = ? AND m.native_language = ? AND m.target_language = ? AND m.proficiency = ?
ORDER BY c.created_at DESC
LIMIT 1
""", (query, native_language, target_language, proficiency)) as cursor:
row = await cursor.fetchone()
if row:
logger.info(f"Found exact query match: {dict(row)['id']}")
return dict(row)
else:
logger.info("No exact query match found")
logger.info("No existing curriculum found")
return None
async def save_metadata_extraction(
self,
query: str,
metadata: Dict[str, Any],
user_id: Optional[int] = None
) -> str:
"""Save extracted metadata and return extraction ID"""
extraction_id = str(uuid.uuid4())
# Validate proficiency before inserting into the database
allowed_proficiencies = {"beginner", "intermediate", "advanced"}
proficiency = metadata.get('proficiency')
if proficiency not in allowed_proficiencies:
logger.warning(
f"Unknown proficiency '{proficiency}' received; defaulting to 'beginner'."
)
proficiency = "beginner"
metadata["proficiency"] = "beginner"
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT INTO metadata_extractions
(id, user_id, query, native_language, target_language, proficiency, title, description, metadata_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
extraction_id,
user_id,
query,
metadata.get('native_language'),
metadata.get('target_language'),
metadata.get('proficiency'),
metadata.get('title'),
metadata.get('description'),
json.dumps(metadata)
))
await db.commit()
logger.info(f"Saved metadata extraction: {extraction_id}")
return extraction_id
async def save_curriculum(
self,
metadata_extraction_id: str,
curriculum: Dict[str, Any],
user_id: Optional[int] = None
) -> str:
"""Save generated curriculum and return curriculum ID"""
curriculum_id = str(uuid.uuid4())
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT INTO curricula
(id, metadata_extraction_id, user_id, lesson_topic, curriculum_json, content_generation_status)
VALUES (?, ?, ?, ?, ?, 'pending')
""", (
curriculum_id,
metadata_extraction_id,
user_id,
curriculum.get('lesson_topic', ''),
json.dumps(curriculum)
))
await db.commit()
logger.info(f"Saved curriculum: {curriculum_id}")
return curriculum_id
async def copy_curriculum_for_user(
self,
source_curriculum_id: str,
metadata_extraction_id: str,
user_id: Optional[int] = None
) -> str:
"""Copy an existing curriculum for a new user"""
new_curriculum_id = str(uuid.uuid4())
async with aiosqlite.connect(self.db_path) as db:
# Get source curriculum
async with db.execute("""
SELECT lesson_topic, curriculum_json FROM curricula WHERE id = ?
""", (source_curriculum_id,)) as cursor:
row = await cursor.fetchone()
if not row:
raise ValueError(f"Source curriculum {source_curriculum_id} not found")
lesson_topic, curriculum_json = row
# Create new curriculum
await db.execute("""
INSERT INTO curricula
(id, metadata_extraction_id, user_id, lesson_topic, curriculum_json, is_content_generated, content_generation_status)
VALUES (?, ?, ?, ?, ?, 0, 'pending')
""", (
new_curriculum_id,
metadata_extraction_id,
user_id,
lesson_topic,
curriculum_json
))
# Copy all learning content
await db.execute("""
INSERT INTO learning_content
(id, curriculum_id, content_type, lesson_index, lesson_topic, content_json)
SELECT
lower(hex(randomblob(16))),
?,
content_type,
lesson_index,
lesson_topic,
content_json
FROM learning_content
WHERE curriculum_id = ?
""", (new_curriculum_id, source_curriculum_id))
# Mark as content generated
await db.execute("""
UPDATE curricula
SET is_content_generated = 1,
content_generation_status = 'completed',
content_generation_completed_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (new_curriculum_id,))
await db.commit()
logger.info(f"Copied curriculum {source_curriculum_id} to {new_curriculum_id} for user {user_id}")
return new_curriculum_id
async def save_learning_content(
self,
curriculum_id: str,
content_type: str,
lesson_index: int,
lesson_topic: str,
content: Any
) -> str:
"""Save learning content (flashcards, exercises, or simulation)"""
content_id = str(uuid.uuid4())
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
INSERT INTO learning_content
(id, curriculum_id, content_type, lesson_index, lesson_topic, content_json)
VALUES (?, ?, ?, ?, ?, ?)
""", (
content_id,
curriculum_id,
content_type,
lesson_index,
lesson_topic,
json.dumps(content) if isinstance(content, (dict, list)) else content
))
await db.commit()
logger.info(f"Saved {content_type} for lesson {lesson_index}")
return content_id
async def mark_curriculum_content_generated(self, curriculum_id: str):
"""Mark curriculum as having all content generated"""
async with aiosqlite.connect(self.db_path) as db:
await db.execute("""
UPDATE curricula
SET is_content_generated = 1,
content_generation_status = 'completed',
content_generation_completed_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (curriculum_id,))
await db.commit()
async def update_content_generation_status(
self,
curriculum_id: str,
status: str,
error_message: Optional[str] = None
):
"""Update content generation status for a curriculum"""
async with aiosqlite.connect(self.db_path) as db:
if status == 'generating':
await db.execute("""
UPDATE curricula
SET content_generation_status = ?,
content_generation_started_at = CURRENT_TIMESTAMP,
content_generation_error = NULL
WHERE id = ?
""", (status, curriculum_id))
elif status == 'completed':
await db.execute("""
UPDATE curricula
SET content_generation_status = ?,
content_generation_completed_at = CURRENT_TIMESTAMP,
content_generation_error = NULL,
is_content_generated = 1
WHERE id = ?
""", (status, curriculum_id))
elif status == 'failed':
await db.execute("""
UPDATE curricula
SET content_generation_status = ?,
content_generation_error = ?
WHERE id = ?
""", (status, error_message, curriculum_id))
else:
await db.execute("""
UPDATE curricula
SET content_generation_status = ?,
content_generation_error = ?
WHERE id = ?
""", (status, error_message, curriculum_id))
await db.commit()
async def get_content_generation_status(self, curriculum_id: str) -> Optional[Dict[str, Any]]:
"""Get content generation status for a curriculum"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT
id,
content_generation_status,
content_generation_error,
content_generation_started_at,
content_generation_completed_at,
is_content_generated
FROM curricula
WHERE id = ?
""", (curriculum_id,)) as cursor:
row = await cursor.fetchone()
if row:
return dict(row)
return None
async def get_metadata_extraction(self, extraction_id: str) -> Optional[Dict[str, Any]]:
"""Get metadata extraction by ID"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT * FROM metadata_extractions WHERE id = ?
""", (extraction_id,)) as cursor:
row = await cursor.fetchone()
if row:
return dict(row)
return None
async def get_curriculum(self, curriculum_id: str) -> Optional[Dict[str, Any]]:
"""Get curriculum by ID"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT c.*, m.native_language, m.target_language, m.proficiency
FROM curricula c
JOIN metadata_extractions m ON c.metadata_extraction_id = m.id
WHERE c.id = ?
""", (curriculum_id,)) as cursor:
row = await cursor.fetchone()
if row:
return dict(row)
return None
async def get_learning_content(
self,
curriculum_id: str,
content_type: Optional[str] = None,
lesson_index: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Get learning content for a curriculum"""
query = "SELECT * FROM learning_content WHERE curriculum_id = ?"
params = [curriculum_id]
if content_type:
query += " AND content_type = ?"
params.append(content_type)
if lesson_index is not None:
query += " AND lesson_index = ?"
params.append(lesson_index)
query += " ORDER BY lesson_index"
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_user_metadata_extractions(
self,
user_id: int,
limit: int = 20
) -> List[Dict[str, Any]]:
"""Get user's metadata extraction history"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT * FROM metadata_extractions
WHERE user_id = ?
ORDER BY created_at DESC
LIMIT ?
""", (user_id, limit)) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_user_curricula(
self,
user_id: int,
limit: int = 20
) -> List[Dict[str, Any]]:
"""Get user's curricula"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT c.*, m.native_language, m.target_language, m.proficiency, m.title
FROM curricula c
JOIN metadata_extractions m ON c.metadata_extraction_id = m.id
WHERE c.user_id = ?
ORDER BY c.created_at DESC
LIMIT ?
""", (user_id, limit)) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_user_learning_journeys(
self,
user_id: int,
limit: int = 20
) -> List[Dict[str, Any]]:
"""Get user's complete learning journeys"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT * FROM user_learning_journeys
WHERE user_id = ?
LIMIT ?
""", (user_id, limit)) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def get_curriculum_content_status(self, curriculum_id: str) -> Optional[Dict[str, Any]]:
"""Get content generation status for a curriculum"""
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute("""
SELECT * FROM curriculum_content_status WHERE curriculum_id = ?
""", (curriculum_id,)) as cursor:
row = await cursor.fetchone()
if row:
return dict(row)
return None
async def get_full_curriculum_details(self, curriculum_id: str, include_content: bool = True) -> Optional[Dict[str, Any]]:
"""Get full curriculum details, optionally including all content."""
curriculum = await self.get_curriculum(curriculum_id)
if not curriculum:
return None
try:
curriculum_data = json.loads(curriculum['curriculum_json'])
lessons = curriculum_data.get('sub_topics', [])
except json.JSONDecodeError:
curriculum_data = {}
lessons = []
if include_content:
content_list = await self.get_learning_content(curriculum_id)
content_map = {}
for content in content_list:
lesson_index = content['lesson_index']
content_type = content['content_type']
if lesson_index not in content_map:
content_map[lesson_index] = {}
try:
parsed_content = json.loads(content['content_json'])
except json.JSONDecodeError:
parsed_content = content['content_json']
content_map[lesson_index][content_type] = {
"id": content['id'],
"lesson_topic": content['lesson_topic'],
"content": parsed_content,
"created_at": content['created_at']
}
# Embed content into lessons
for i, lesson in enumerate(lessons):
lesson['content'] = content_map.get(i, {})
curriculum['curriculum'] = curriculum_data
del curriculum['curriculum_json']
return curriculum
async def search_curricula_by_languages(
self,
native_language: str,
target_language: str,
proficiency: Optional[str] = None,
limit: int = 10
) -> List[Dict[str, Any]]:
"""Search for existing curricula by language combination"""
query = """
SELECT c.*, m.native_language, m.target_language, m.proficiency, m.title
FROM curricula c
JOIN metadata_extractions m ON c.metadata_extraction_id = m.id
WHERE m.native_language = ? AND m.target_language = ?
"""
params = [native_language, target_language]
if proficiency:
query += " AND m.proficiency = ?"
params.append(proficiency)
query += " ORDER BY c.created_at DESC LIMIT ?"
params.append(limit)
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
# Global database instance
db = Database()