Aperture / app /database.py
KSvend
fix: sanitize NaN/inf floats to prevent JSON serialization crashes
2850a7e
from __future__ import annotations
import json
import uuid
from datetime import datetime, UTC
import aiosqlite
from app.models import Job, JobRequest, JobStatus, ProductResult
class Database:
def __init__(self, db_path: str = "aperture.db") -> None:
self.db_path = db_path
self._initialized = False
async def _ensure_init(self) -> None:
if not self._initialized:
await self.init()
async def init(self) -> None:
self._initialized = True
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"""
CREATE TABLE IF NOT EXISTS jobs (
id TEXT PRIMARY KEY,
request_json TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
progress_json TEXT NOT NULL DEFAULT '{}',
results_json TEXT NOT NULL DEFAULT '[]',
error TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
"""
)
await db.commit()
# Migration: add email column if missing
cur = await db.execute("PRAGMA table_info(jobs)")
columns = {row[1] for row in await cur.fetchall()}
if "email" not in columns:
await db.execute(
"ALTER TABLE jobs ADD COLUMN email TEXT NOT NULL DEFAULT ''"
)
await db.commit()
async def create_job(self, request: JobRequest) -> str:
await self._ensure_init()
job_id = uuid.uuid4().hex[:12]
now = datetime.now(UTC).isoformat()
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"INSERT INTO jobs (id, request_json, status, created_at, updated_at, email) VALUES (?, ?, ?, ?, ?, ?)",
(job_id, request.model_dump_json(), JobStatus.QUEUED.value, now, now, request.email),
)
await db.commit()
return job_id
async def get_job(self, job_id: str) -> Job | None:
await self._ensure_init()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute("SELECT * FROM jobs WHERE id = ?", (job_id,))
row = await cursor.fetchone()
if row is None:
return None
return self._row_to_job(row)
async def update_job_status(
self, job_id: str, status: JobStatus, error: str | None = None
) -> None:
await self._ensure_init()
now = datetime.now(UTC).isoformat()
async with aiosqlite.connect(self.db_path) as db:
await db.execute(
"UPDATE jobs SET status = ?, error = ?, updated_at = ? WHERE id = ?",
(status.value, error, now, job_id),
)
await db.commit()
async def update_job_progress(
self, job_id: str, product_id: str, status: str
) -> None:
await self._ensure_init()
now = datetime.now(UTC).isoformat()
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT progress_json FROM jobs WHERE id = ?", (job_id,)
)
row = await cursor.fetchone()
progress = json.loads(row[0])
progress[product_id] = status
await db.execute(
"UPDATE jobs SET progress_json = ?, updated_at = ? WHERE id = ?",
(json.dumps(progress), now, job_id),
)
await db.commit()
async def save_job_result(self, job_id: str, result: ProductResult) -> None:
await self._ensure_init()
now = datetime.now(UTC).isoformat()
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(
"SELECT results_json FROM jobs WHERE id = ?", (job_id,)
)
row = await cursor.fetchone()
results = json.loads(row[0])
from app.models import sanitize_for_json
results.append(sanitize_for_json(result.model_dump()))
await db.execute(
"UPDATE jobs SET results_json = ?, updated_at = ? WHERE id = ?",
(json.dumps(results), now, job_id),
)
await db.commit()
@staticmethod
def _row_to_job(row) -> Job:
return Job(
id=row["id"],
request=JobRequest.model_validate_json(row["request_json"]),
status=JobStatus(row["status"]),
progress=json.loads(row["progress_json"]),
results=[
ProductResult.model_validate(r)
for r in json.loads(row["results_json"])
],
error=row["error"],
created_at=datetime.fromisoformat(row["created_at"]),
updated_at=datetime.fromisoformat(row["updated_at"]),
)
async def get_next_queued_job(self) -> Job | None:
await self._ensure_init()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM jobs WHERE status = ? ORDER BY created_at ASC LIMIT 1",
(JobStatus.QUEUED.value,),
)
row = await cursor.fetchone()
if row is None:
return None
return self._row_to_job(row)
async def get_jobs_by_email(self, email: str) -> list[Job]:
await self._ensure_init()
async with aiosqlite.connect(self.db_path) as db:
db.row_factory = aiosqlite.Row
cur = await db.execute(
"SELECT * FROM jobs WHERE email = ? ORDER BY created_at DESC",
(email,),
)
rows = await cur.fetchall()
return [self._row_to_job(row) for row in rows]