darkfire514's picture
Update openspace/skill_engine/store.py
ef4f876 verified
"""
Storage location: <project_root>/.openspace/openspace.db
Tables:
skill_records β€” SkillRecord main table
skill_lineage_parents β€” Lineage parent-child relationships (many-to-many)
execution_analyses β€” ExecutionAnalysis records (one per task)
skill_judgments β€” Per-skill judgments within an analysis
skill_tool_deps β€” Tool dependencies
skill_tags β€” Auxiliary tags
"""
from __future__ import annotations
import asyncio
import json
import os
import sqlite3
import threading
import time
from contextlib import contextmanager
from datetime import datetime
from functools import wraps
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional
try:
import libsql_experimental as libsql
except ImportError:
libsql = None
class _LibsqlCursorProxy:
def __init__(self, cursor, conn_proxy):
self._cursor = cursor
self._conn_proxy = conn_proxy
def execute(self, *args, **kwargs):
self._cursor.execute(*args, **kwargs)
return self
def executescript(self, *args, **kwargs):
self._cursor.executescript(*args, **kwargs)
return self
def fetchone(self):
row = self._cursor.fetchone()
if row is not None and self._conn_proxy.row_factory:
return self._conn_proxy.row_factory(self, row)
return row
def fetchall(self):
rows = self._cursor.fetchall()
if self._conn_proxy.row_factory:
return [self._conn_proxy.row_factory(self, row) for row in rows]
return rows
@property
def description(self):
return getattr(self._cursor, "description", [])
@property
def rowcount(self):
return getattr(self._cursor, "rowcount", -1)
@property
def lastrowid(self):
return getattr(self._cursor, "lastrowid", None)
class _LibsqlConnectionProxy:
def __init__(self, conn):
self._conn = conn
self.row_factory = None
def execute(self, *args, **kwargs):
cursor = self.cursor()
return cursor.execute(*args, **kwargs)
def executescript(self, *args, **kwargs):
cursor = self.cursor()
return cursor.executescript(*args, **kwargs)
def cursor(self):
return _LibsqlCursorProxy(self._conn.cursor(), self)
def commit(self):
if hasattr(self._conn, "commit"):
return self._conn.commit()
def rollback(self):
if hasattr(self._conn, "rollback"):
return self._conn.rollback()
def close(self):
if hasattr(self._conn, "close"):
return self._conn.close()
class _RowProxy:
def __init__(self, row, description):
self._row = row
self._description = description
self._col_map = {col[0]: idx for idx, col in enumerate(description)}
def __getitem__(self, item):
if isinstance(item, int):
return self._row[item]
if item in self._col_map:
return self._row[self._col_map[item]]
raise KeyError(item)
def keys(self):
return self._col_map.keys()
def __iter__(self):
return iter(self._row)
def __len__(self):
return len(self._row)
def _dict_factory(cursor, row):
if hasattr(cursor, "description") and cursor.description:
return _RowProxy(row, cursor.description)
return row
from .patch import collect_skill_snapshot, compute_unified_diff
from .types import (
EvolutionSuggestion,
ExecutionAnalysis,
SkillCategory,
SkillJudgment,
SkillLineage,
SkillOrigin,
SkillRecord,
SkillVisibility,
)
from openspace.utils.logging import Logger
from openspace.config.constants import PROJECT_ROOT
logger = Logger.get_logger(__name__)
def _db_retry(
max_retries: int = 5,
initial_delay: float = 0.1,
backoff: float = 2.0,
):
"""Retry on transient SQLite/Turso errors with exponential backoff.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
delay = initial_delay
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as exc:
err_name = type(exc).__name__
if err_name not in ("OperationalError", "DatabaseError", "LibsqlError", "Error"):
raise
if attempt == max_retries - 1:
logger.error(
f"DB {func.__name__} failed after "
f"{max_retries} retries: {exc}"
)
raise
logger.warning(
f"DB {func.__name__} retry {attempt + 1}"
f"/{max_retries}: {exc}"
)
time.sleep(delay)
delay *= backoff
return wrapper
return decorator
_DDL = """
CREATE TABLE IF NOT EXISTS skill_records (
skill_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT NOT NULL DEFAULT '',
path TEXT NOT NULL DEFAULT '',
is_active INTEGER NOT NULL DEFAULT 1,
category TEXT NOT NULL DEFAULT 'workflow',
visibility TEXT NOT NULL DEFAULT 'private',
creator_id TEXT NOT NULL DEFAULT '',
lineage_origin TEXT NOT NULL DEFAULT 'imported',
lineage_generation INTEGER NOT NULL DEFAULT 0,
lineage_source_task_id TEXT,
lineage_change_summary TEXT NOT NULL DEFAULT '',
lineage_content_diff TEXT NOT NULL DEFAULT '',
lineage_content_snapshot TEXT NOT NULL DEFAULT '{}',
lineage_created_at TEXT NOT NULL,
lineage_created_by TEXT NOT NULL DEFAULT '',
total_selections INTEGER NOT NULL DEFAULT 0,
total_applied INTEGER NOT NULL DEFAULT 0,
total_completions INTEGER NOT NULL DEFAULT 0,
total_fallbacks INTEGER NOT NULL DEFAULT 0,
first_seen TEXT NOT NULL,
last_updated TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sr_category ON skill_records(category);
CREATE INDEX IF NOT EXISTS idx_sr_updated ON skill_records(last_updated);
CREATE INDEX IF NOT EXISTS idx_sr_active ON skill_records(is_active);
CREATE INDEX IF NOT EXISTS idx_sr_name ON skill_records(name);
CREATE TABLE IF NOT EXISTS skill_lineage_parents (
skill_id TEXT NOT NULL
REFERENCES skill_records(skill_id) ON DELETE CASCADE,
parent_skill_id TEXT NOT NULL,
PRIMARY KEY (skill_id, parent_skill_id)
);
CREATE INDEX IF NOT EXISTS idx_lp_parent
ON skill_lineage_parents(parent_skill_id);
-- One row per task. task_id is UNIQUE (at most one analysis per task).
CREATE TABLE IF NOT EXISTS execution_analyses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL UNIQUE,
timestamp TEXT NOT NULL,
task_completed INTEGER NOT NULL DEFAULT 0,
execution_note TEXT NOT NULL DEFAULT '',
tool_issues TEXT NOT NULL DEFAULT '[]',
candidate_for_evolution INTEGER NOT NULL DEFAULT 0,
evolution_suggestions TEXT NOT NULL DEFAULT '[]',
analyzed_by TEXT NOT NULL DEFAULT '',
analyzed_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_ea_task ON execution_analyses(task_id);
CREATE INDEX IF NOT EXISTS idx_ea_ts ON execution_analyses(timestamp);
-- Per-skill judgments within an analysis.
-- FK to execution_analyses.id (CASCADE delete).
-- skill_id is a plain TEXT β€” no FK to skill_records so that
-- historical judgments survive skill deletion.
CREATE TABLE IF NOT EXISTS skill_judgments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
analysis_id INTEGER NOT NULL
REFERENCES execution_analyses(id) ON DELETE CASCADE,
skill_id TEXT NOT NULL,
skill_applied INTEGER NOT NULL DEFAULT 0,
note TEXT NOT NULL DEFAULT '',
UNIQUE(analysis_id, skill_id)
);
CREATE INDEX IF NOT EXISTS idx_sj_skill ON skill_judgments(skill_id);
CREATE INDEX IF NOT EXISTS idx_sj_analysis ON skill_judgments(analysis_id);
CREATE TABLE IF NOT EXISTS skill_tool_deps (
skill_id TEXT NOT NULL
REFERENCES skill_records(skill_id) ON DELETE CASCADE,
tool_key TEXT NOT NULL,
critical INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (skill_id, tool_key)
);
CREATE INDEX IF NOT EXISTS idx_td_tool ON skill_tool_deps(tool_key);
CREATE TABLE IF NOT EXISTS skill_tags (
skill_id TEXT NOT NULL
REFERENCES skill_records(skill_id) ON DELETE CASCADE,
tag TEXT NOT NULL,
PRIMARY KEY (skill_id, tag)
);
"""
class SkillStore:
"""SQLite persistence engine β€” Skill quality tracking and evolution ledger.
Architecture:
Write path: async method β†’ asyncio.to_thread β†’ _xxx_sync β†’ self._mu lock β†’ self._conn
Read path: sync method β†’ self._reader() β†’ independent short connection (WAL parallel read)
Lifecycle: ``__init__()`` β†’ use β†’ ``close()``
Also supports async context manager:
async with SkillStore() as store:
await store.save_record(record)
rec = store.load_record(skill_id)
"""
def __init__(self, db_path: Optional[Path] = None) -> None:
if db_path is None:
db_dir = PROJECT_ROOT / ".openspace"
db_dir.mkdir(parents=True, exist_ok=True)
db_path = db_dir / "openspace.db"
self._db_path = Path(db_path)
self._mu = threading.Lock()
self._closed = False
# Crash recovery: clean up stale WAL/SHM from unclean shutdown
self._cleanup_wal_on_startup()
# Persistent write connection
self._conn = self._make_connection(read_only=False)
self._init_db()
logger.debug(f"SkillStore ready at {self._db_path}")
def _make_connection(self, *, read_only: bool) -> sqlite3.Connection:
"""Create a tuned SQLite or Turso connection.
Write connection: ``check_same_thread=False`` for cross-thread
usage via ``asyncio.to_thread()``.
Read connection: ``query_only=ON`` pragma for safety.
"""
turso_url = os.environ.get("TURSO_DATABASE_URL")
turso_token = os.environ.get("TURSO_AUTH_TOKEN")
if turso_url and libsql is not None:
# Connect to remote Turso database
raw_conn = libsql.connect(turso_url, auth_token=turso_token)
conn = _LibsqlConnectionProxy(raw_conn)
conn.row_factory = _dict_factory
return conn
conn = sqlite3.connect(
str(self._db_path),
timeout=30.0,
check_same_thread=False,
)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000")
conn.execute("PRAGMA synchronous=NORMAL")
conn.execute("PRAGMA cache_size=-16000") # 16 MB
conn.execute("PRAGMA temp_store=MEMORY")
conn.execute("PRAGMA foreign_keys=ON")
if read_only:
conn.execute("PRAGMA query_only=ON")
conn.row_factory = sqlite3.Row
return conn
@contextmanager
def _reader(self) -> Generator[sqlite3.Connection, None, None]:
"""Open a temporary read-only connection.
WAL mode allows concurrent readers and one writer.
Each read operation gets its own connection so reads never
block the event loop and never contend with the write lock.
"""
self._ensure_open()
conn = self._make_connection(read_only=True)
try:
yield conn
finally:
conn.close()
def _cleanup_wal_on_startup(self) -> None:
"""Remove stale WAL/SHM left by unclean shutdown.
If the main DB file is empty (0 bytes) but WAL/SHM companions
exist, the database is unrecoverable β€” delete the companions
so SQLite can start fresh.
"""
if not self._db_path.exists():
return
wal = Path(f"{self._db_path}-wal")
shm = Path(f"{self._db_path}-shm")
if self._db_path.stat().st_size == 0 and (
wal.exists() or shm.exists()
):
logger.warning(
"Empty DB with WAL/SHM β€” removing for crash recovery"
)
for f in (wal, shm):
if f.exists():
f.unlink()
@_db_retry()
def _init_db(self) -> None:
"""Create tables if they don't exist (idempotent via IF NOT EXISTS)."""
with self._mu:
self._conn.executescript(_DDL)
self._conn.commit()
def close(self) -> None:
"""Close the persistent connection. Subsequent ops will raise.
Performs a WAL checkpoint before closing so that all committed
data is flushed from the WAL file into the main ``.db`` file.
This ensures external tools (DB browsers, backup scripts) see
complete data without needing to understand SQLite WAL mode.
"""
if self._closed:
return
self._closed = True
try:
# Flush WAL β†’ main DB so external readers see all data
if not os.environ.get("TURSO_DATABASE_URL"):
self._conn.execute("PRAGMA wal_checkpoint(TRUNCATE)")
self._conn.close()
except Exception:
pass
logger.debug("SkillStore closed")
async def __aenter__(self):
return self
async def __aexit__(self, *exc):
self.close()
@property
def db_path(self) -> Path:
return self._db_path
def _ensure_open(self) -> None:
if self._closed:
raise RuntimeError("SkillStore is closed")
# Write API (async, offloaded via asyncio.to_thread)
async def save_record(self, record: SkillRecord) -> None:
"""Upsert a single :class:`SkillRecord`."""
await asyncio.to_thread(self._save_record_sync, record)
async def save_records(self, records: List[SkillRecord]) -> None:
"""Batch upsert in a single transaction."""
await asyncio.to_thread(self._save_records_sync, records)
async def sync_from_registry(
self,
discovered_skills: List[Any],
) -> int:
"""Ensure every discovered skill has an initial DB record.
For each skill in *discovered_skills* (``SkillMeta`` objects
from :meth:`SkillRegistry.discover`), if no record with the
same ``skill_id`` already exists, a new :class:`SkillRecord` is
created (``origin=IMPORTED``, ``generation=0``).
Existing records (including evolved ones) are left untouched.
Args:
discovered_skills: List of ``SkillMeta`` objects.
"""
return await asyncio.to_thread(
self._sync_from_registry_sync, discovered_skills,
)
@_db_retry()
def _sync_from_registry_sync(
self, discovered_skills: List[Any],
) -> int:
self._ensure_open()
created = 0
refreshed = 0
with self._mu:
self._conn.execute("BEGIN")
try:
# Fetch all existing records keyed by skill_id
rows = self._conn.execute(
"SELECT skill_id, name, description, "
"lineage_content_snapshot "
"FROM skill_records"
).fetchall()
existing: Dict[str, Any] = {r[0]: r for r in rows}
# Also fetch all paths with an active record.
# After FIX evolution the DB skill_id changes but the
# filesystem path stays the same. Matching by path
# prevents creating a duplicate imported record on restart.
path_rows = self._conn.execute(
"SELECT path FROM skill_records WHERE is_active=1"
).fetchall()
existing_active_paths: set = {r[0] for r in path_rows}
for meta in discovered_skills:
path_str = str(meta.path)
skill_dir = meta.path.parent
if meta.skill_id in existing:
# Refresh name/description if frontmatter changed,
# and backfill empty content_snapshot
row = existing[meta.skill_id]
updates: List[str] = []
params: list = []
if row["name"] != meta.name:
updates.append("name=?")
params.append(meta.name)
if row["description"] != meta.description:
updates.append("description=?")
params.append(meta.description)
raw_snap = row["lineage_content_snapshot"] or ""
if raw_snap in ("", "{}"):
try:
snap = collect_skill_snapshot(skill_dir)
if snap:
updates.append("lineage_content_snapshot=?")
params.append(json.dumps(snap, ensure_ascii=False))
diff = "\n".join(
compute_unified_diff("", text, filename=name)
for name, text in sorted(snap.items())
if compute_unified_diff("", text, filename=name)
)
if diff:
updates.append("lineage_content_diff=?")
params.append(diff)
except Exception as e:
logger.warning(
f"sync_from_registry: snapshot backfill failed "
f"for {meta.skill_id}: {e}"
)
if updates:
params.append(meta.skill_id)
self._conn.execute(
f"UPDATE skill_records SET {', '.join(updates)} "
f"WHERE skill_id=?",
params,
)
refreshed += 1
continue
# Path already covered by an evolved record
if path_str in existing_active_paths:
continue
# Snapshot the directory so this version can be restored later
snapshot: Dict[str, str] = {}
content_diff = ""
try:
snapshot = collect_skill_snapshot(skill_dir)
content_diff = "\n".join(
compute_unified_diff("", text, filename=name)
for name, text in sorted(snapshot.items())
if compute_unified_diff("", text, filename=name)
)
except Exception as e:
logger.warning(
f"sync_from_registry: failed to snapshot {skill_dir}: {e}"
)
record = SkillRecord(
skill_id=meta.skill_id,
name=meta.name,
description=meta.description,
path=path_str,
is_active=True,
lineage=SkillLineage(
origin=SkillOrigin.IMPORTED,
generation=0,
content_snapshot=snapshot,
content_diff=content_diff,
),
)
self._upsert(record)
created += 1
logger.debug(
f"sync_from_registry: created {meta.name} [{meta.skill_id}]"
)
self._conn.commit()
except Exception:
self._conn.rollback()
raise
if created or refreshed:
logger.info(
f"sync_from_registry: {created} new record(s) created, "
f"{refreshed} refreshed, "
f"{len(discovered_skills) - created - refreshed} unchanged"
)
return created
async def record_analysis(self, analysis: ExecutionAnalysis) -> None:
"""Atomic observation: insert analysis + judgments + increment counters.
1. INSERT a row in ``execution_analyses`` (one per task).
2. INSERT rows in ``skill_judgments`` for each skill assessed.
3. For each judgment, atomically increment the matching
``skill_records`` counters:
- total_selections += 1 (always)
- total_applied += 1 (if skill_applied)
- total_completions += 1 (if applied and completed)
- total_fallbacks += 1 (if not applied and not completed)
- last_updated = now
"""
await asyncio.to_thread(self._record_analysis_sync, analysis)
async def evolve_skill(
self,
new_record: SkillRecord,
parent_skill_ids: List[str],
) -> None:
"""Atomic evolution: insert new version + deactivate old version.
**FIXED** β€” Same-name skill fix:
- ``new_record.name`` is the same as parent
- ``new_record.path`` is the same as parent
- parent is set to ``is_active=False``
- ``new_record.is_active=True``
**DERIVED** β€” New skill derived:
- ``new_record.name`` is a new name
- parent is kept ``is_active=True`` (it is still the latest version of its line)
- ``new_record.is_active=True``
In the same SQL transaction, guaranteed by ``self._mu``.
Args:
new_record : SkillRecord
New version record, ``lineage.parent_skill_ids`` must be non-empty.
parent_skill_ids : list[str]
Parent skill_id list (FIXED exactly 1, DERIVED β‰₯ 1).
For FIXED, parent is automatically deactivated.
"""
await asyncio.to_thread(
self._evolve_skill_sync, new_record, parent_skill_ids
)
async def deactivate_record(self, skill_id: str) -> bool:
"""Set a specific record's ``is_active`` to False."""
return await asyncio.to_thread(self._deactivate_record_sync, skill_id)
async def reactivate_record(self, skill_id: str) -> bool:
"""Set a specific record's ``is_active`` to True (revert / rollback)."""
return await asyncio.to_thread(self._reactivate_record_sync, skill_id)
async def delete_record(self, skill_id: str) -> bool:
"""Delete a skill and all related data (CASCADE)."""
return await asyncio.to_thread(self._delete_record_sync, skill_id)
# Sync write implementations (thread-safe via self._mu)
@_db_retry()
def _save_record_sync(self, record: SkillRecord) -> None:
self._ensure_open()
with self._mu:
self._conn.execute("BEGIN")
try:
self._upsert(record)
self._conn.commit()
except Exception:
self._conn.rollback()
raise
@_db_retry()
def _save_records_sync(self, records: List[SkillRecord]) -> None:
self._ensure_open()
with self._mu:
self._conn.execute("BEGIN")
try:
for r in records:
self._upsert(r)
self._conn.commit()
except Exception:
self._conn.rollback()
raise
@_db_retry()
def _record_analysis_sync(self, analysis: ExecutionAnalysis) -> None:
"""Persist an analysis and update skill quality counters.
``SkillJudgment.skill_id`` is the **true skill_id** (e.g.
``weather__imp_a1b2c3d4``), the same identifier used as the DB
primary key. The analysis LLM receives skill_ids in its prompt
and outputs them verbatim.
We update counters via ``WHERE skill_id = ?`` β€” exact match, no
ambiguity.
"""
self._ensure_open()
with self._mu:
self._conn.execute("BEGIN")
try:
analysis_id = self._insert_analysis(analysis)
now_iso = datetime.now().isoformat()
for j in analysis.skill_judgments:
applied = 1 if j.skill_applied else 0
completed = (
1
if (j.skill_applied and analysis.task_completed)
else 0
)
fallback = (
1
if (not j.skill_applied and not analysis.task_completed)
else 0
)
self._conn.execute(
"""
UPDATE skill_records SET
total_selections = total_selections + 1,
total_applied = total_applied + ?,
total_completions = total_completions + ?,
total_fallbacks = total_fallbacks + ?,
last_updated = ?
WHERE skill_id = ?
""",
(applied, completed, fallback, now_iso, j.skill_id),
)
self._conn.commit()
except Exception:
self._conn.rollback()
raise
@_db_retry()
def _evolve_skill_sync(
self,
new_record: SkillRecord,
parent_skill_ids: List[str],
) -> None:
"""Atomic: insert new version + deactivate parents (for FIXED)."""
self._ensure_open()
with self._mu:
self._conn.execute("BEGIN")
try:
# For FIXED: deactivate same-name parents
if new_record.lineage.origin == SkillOrigin.FIXED:
for pid in parent_skill_ids:
self._conn.execute(
"UPDATE skill_records SET is_active=0, "
"last_updated=? WHERE skill_id=?",
(datetime.now().isoformat(), pid),
)
# Ensure new record has parent refs set
new_record.lineage.parent_skill_ids = list(parent_skill_ids)
new_record.is_active = True
self._upsert(new_record)
self._conn.commit()
origin = new_record.lineage.origin.value
logger.info(
f"evolve_skill ({origin}): "
f"{new_record.name}@gen{new_record.lineage.generation} "
f"[{new_record.skill_id}] ← parents={parent_skill_ids}"
)
except Exception:
self._conn.rollback()
raise
@_db_retry()
def _deactivate_record_sync(self, skill_id: str) -> bool:
self._ensure_open()
with self._mu:
cur = self._conn.execute(
"UPDATE skill_records SET is_active=0, last_updated=? "
"WHERE skill_id=?",
(datetime.now().isoformat(), skill_id),
)
self._conn.commit()
return cur.rowcount > 0
@_db_retry()
def _reactivate_record_sync(self, skill_id: str) -> bool:
self._ensure_open()
with self._mu:
cur = self._conn.execute(
"UPDATE skill_records SET is_active=1, last_updated=? "
"WHERE skill_id=?",
(datetime.now().isoformat(), skill_id),
)
self._conn.commit()
return cur.rowcount > 0
@_db_retry()
def _delete_record_sync(self, skill_id: str) -> bool:
self._ensure_open()
with self._mu:
# ON DELETE CASCADE automatically cleans up lineage_parents / deps / tags
# skill_judgments are NOT cascade-deleted (no FK to skill_records)
cur = self._conn.execute(
"DELETE FROM skill_records WHERE skill_id=?", (skill_id,)
)
self._conn.commit()
return cur.rowcount > 0
# Read API (sync, each call opens its own read-only conn)
@_db_retry()
def load_record(self, skill_id: str) -> Optional[SkillRecord]:
"""Load a single :class:`SkillRecord` by id."""
with self._reader() as conn:
row = conn.execute(
"SELECT * FROM skill_records WHERE skill_id=?",
(skill_id,),
).fetchone()
return self._to_record(conn, row) if row else None
@_db_retry()
def load_all(
self, *, active_only: bool = False
) -> Dict[str, SkillRecord]:
"""Load skill records, keyed by ``skill_id``.
Args:
active_only: If True, only return records with ``is_active=True``.
"""
with self._reader() as conn:
if active_only:
rows = conn.execute(
"SELECT * FROM skill_records WHERE is_active=1"
).fetchall()
else:
rows = conn.execute("SELECT * FROM skill_records").fetchall()
records = self._to_records_bulk(conn, rows)
result: Dict[str, SkillRecord] = {rec.skill_id: rec for rec in records}
logger.info(f"Loaded {len(result)} skill records (active_only={active_only})")
return result
@_db_retry()
def load_active(self) -> Dict[str, SkillRecord]:
"""Load only active skill records, keyed by ``skill_id``.
Convenience wrapper for ``load_all(active_only=True)``.
"""
return self.load_all(active_only=True)
@_db_retry()
def load_record_by_path(self, skill_dir: str) -> Optional[SkillRecord]:
"""Load the most recent active SkillRecord whose ``path`` is inside *skill_dir*.
Used by ``upload_skill`` to retrieve pre-computed upload metadata
(origin, parents, change_summary, etc.) from the DB when
``.upload_meta.json`` is missing.
The match uses ``path LIKE '{skill_dir}%'`` so both
``/a/b/SKILL.md`` and ``/a/b/scenarios/x.md`` match ``/a/b``.
Returns the newest active record (by ``last_updated DESC``).
"""
normalized = skill_dir.rstrip("/")
with self._reader() as conn:
row = conn.execute(
"SELECT * FROM skill_records "
"WHERE path LIKE ? AND is_active=1 "
"ORDER BY last_updated DESC LIMIT 1",
(f"{normalized}%",),
).fetchone()
return self._to_record(conn, row) if row else None
@_db_retry()
def get_versions(self, name: str) -> List[SkillRecord]:
"""Load all versions of a named skill (active + inactive), sorted by generation."""
with self._reader() as conn:
rows = conn.execute(
"SELECT * FROM skill_records WHERE name=? "
"ORDER BY lineage_generation ASC",
(name,),
).fetchall()
return self._to_records_bulk(conn, rows)
@_db_retry()
def load_by_category(
self, category: SkillCategory, *, active_only: bool = True
) -> List[SkillRecord]:
"""Load skill records filtered by category.
Args:
active_only: If True (default), only return active records.
"""
with self._reader() as conn:
if active_only:
rows = conn.execute(
"SELECT * FROM skill_records "
"WHERE category=? AND is_active=1",
(category.value,),
).fetchall()
else:
rows = conn.execute(
"SELECT * FROM skill_records WHERE category=?",
(category.value,),
).fetchall()
return self._to_records_bulk(conn, rows)
@_db_retry()
def load_analyses(
self,
skill_id: Optional[str] = None,
limit: int = 50,
) -> List[ExecutionAnalysis]:
"""Load recent analyses.
Args:
skill_id: True ``skill_id`` (e.g. ``weather__imp_a1b2c3d4``).
``skill_judgments.skill_id`` now stores the true skill_id,
so filtering uses exact match.
If None, return pure-execution analyses (no judgments).
"""
with self._reader() as conn:
if skill_id is not None:
rows = conn.execute(
"SELECT ea.* FROM execution_analyses ea "
"JOIN skill_judgments sj ON ea.id = sj.analysis_id "
"WHERE sj.skill_id = ? "
"ORDER BY ea.timestamp DESC LIMIT ?",
(skill_id, limit),
).fetchall()
else:
rows = conn.execute(
"SELECT ea.* FROM execution_analyses ea "
"LEFT JOIN skill_judgments sj ON ea.id = sj.analysis_id "
"WHERE sj.id IS NULL "
"ORDER BY ea.timestamp DESC LIMIT ?",
(limit,),
).fetchall()
return [self._to_analysis(conn, r) for r in reversed(rows)]
@_db_retry()
def load_analyses_for_task(
self, task_id: str
) -> Optional[ExecutionAnalysis]:
"""Load the analysis for a specific task, or None."""
with self._reader() as conn:
row = conn.execute(
"SELECT * FROM execution_analyses WHERE task_id=?",
(task_id,),
).fetchone()
return self._to_analysis(conn, row) if row else None
@_db_retry()
def load_all_analyses(self, limit: int = 200) -> List[ExecutionAnalysis]:
"""Load recent analyses across all tasks."""
with self._reader() as conn:
rows = conn.execute(
"SELECT * FROM execution_analyses "
"ORDER BY timestamp DESC LIMIT ?",
(limit,),
).fetchall()
return [self._to_analysis(conn, r) for r in reversed(rows)]
@_db_retry()
def load_evolution_candidates(
self, limit: int = 50
) -> List[ExecutionAnalysis]:
"""Load analyses marked as evolution candidates."""
with self._reader() as conn:
rows = conn.execute(
"SELECT * FROM execution_analyses "
"WHERE candidate_for_evolution=1 "
"ORDER BY timestamp DESC LIMIT ?",
(limit,),
).fetchall()
return [self._to_analysis(conn, r) for r in reversed(rows)]
@_db_retry()
def find_skills_by_tool(self, tool_key: str) -> List[str]:
"""
Only returns active records β€” deactivated (superseded) versions
are excluded so that Trigger 2 never re-processes old versions.
"""
with self._reader() as conn:
rows = conn.execute(
"SELECT sd.skill_id "
"FROM skill_tool_deps sd "
"JOIN skill_records sr ON sd.skill_id = sr.skill_id "
"WHERE sd.tool_key=? AND sr.is_active=1",
(tool_key,),
).fetchall()
return [r["skill_id"] for r in rows]
@_db_retry()
def find_children(self, parent_skill_id: str) -> List[str]:
"""Find skill_ids derived from the given parent."""
with self._reader() as conn:
rows = conn.execute(
"SELECT skill_id FROM skill_lineage_parents "
"WHERE parent_skill_id=?",
(parent_skill_id,),
).fetchall()
return [r["skill_id"] for r in rows]
@_db_retry()
def count(self, *, active_only: bool = False) -> int:
"""Total number of skill records."""
with self._reader() as conn:
if active_only:
return conn.execute(
"SELECT COUNT(*) FROM skill_records WHERE is_active=1"
).fetchone()[0]
return conn.execute(
"SELECT COUNT(*) FROM skill_records"
).fetchone()[0]
# Analytics / Summary
@_db_retry()
def get_summary(self, *, active_only: bool = True) -> List[Dict[str, Any]]:
"""Lightweight summary of skills (no analyses/deps loaded).
Default filters to active skills only.
"""
with self._reader() as conn:
where = "WHERE is_active=1 " if active_only else ""
rows = conn.execute(
f"""
SELECT skill_id, name, description, category, is_active,
visibility, creator_id,
lineage_origin, lineage_generation,
total_selections, total_applied,
total_completions, total_fallbacks,
first_seen, last_updated
FROM skill_records
{where}
ORDER BY last_updated DESC
"""
).fetchall()
return [dict(r) for r in rows]
@_db_retry()
def get_stats(self, *, active_only: bool = True) -> Dict[str, Any]:
"""Aggregate statistics across skills."""
with self._reader() as conn:
where = " WHERE is_active=1" if active_only else ""
total = conn.execute(
f"SELECT COUNT(*) FROM skill_records{where}"
).fetchone()[0]
by_category = {
r["category"]: r["cnt"]
for r in conn.execute(
f"SELECT category, COUNT(*) AS cnt "
f"FROM skill_records{where} GROUP BY category"
).fetchall()
}
by_origin = {
r["lineage_origin"]: r["cnt"]
for r in conn.execute(
f"SELECT lineage_origin, COUNT(*) AS cnt "
f"FROM skill_records{where} GROUP BY lineage_origin"
).fetchall()
}
n_analyses = conn.execute(
"SELECT COUNT(*) FROM execution_analyses"
).fetchone()[0]
n_candidates = conn.execute(
"SELECT COUNT(*) FROM execution_analyses "
"WHERE candidate_for_evolution=1"
).fetchone()[0]
agg = conn.execute(
f"""
SELECT SUM(total_selections) AS sel,
SUM(total_applied) AS app,
SUM(total_completions) AS comp,
SUM(total_fallbacks) AS fb
FROM skill_records{where}
"""
).fetchone()
# Also report total (including inactive) for context
total_all = conn.execute(
"SELECT COUNT(*) FROM skill_records"
).fetchone()[0]
return {
"total_skills": total,
"total_skills_all": total_all,
"by_category": by_category,
"by_origin": by_origin,
"total_analyses": n_analyses,
"evolution_candidates": n_candidates,
"total_selections": agg["sel"] or 0,
"total_applied": agg["app"] or 0,
"total_completions": agg["comp"] or 0,
"total_fallbacks": agg["fb"] or 0,
}
@_db_retry()
def get_task_skill_summary(self, task_id: str) -> Dict[str, Any]:
"""Per-task summary: task-level fields + per-skill judgments.
Useful for understanding how multiple skills contributed to a
single task execution.
Returns:
dict: ``{"task_id", "task_completed", "execution_note",
"tool_issues", "judgments": [{skill_id, skill_applied, note}],
...}`` or empty dict if the task has no analysis.
"""
with self._reader() as conn:
row = conn.execute(
"SELECT * FROM execution_analyses WHERE task_id=?",
(task_id,),
).fetchone()
if not row:
return {}
judgment_rows = conn.execute(
"SELECT skill_id, skill_applied, note "
"FROM skill_judgments WHERE analysis_id=?",
(row["id"],),
).fetchall()
try:
evo_suggestions = json.loads(row["evolution_suggestions"] or "[]")
except json.JSONDecodeError:
evo_suggestions = []
return {
"task_id": row["task_id"],
"timestamp": row["timestamp"],
"task_completed": bool(row["task_completed"]),
"execution_note": row["execution_note"],
"tool_issues": json.loads(row["tool_issues"]),
"candidate_for_evolution": bool(row["candidate_for_evolution"]),
"evolution_suggestions": evo_suggestions,
"analyzed_by": row["analyzed_by"],
"judgments": [
{
"skill_id": jr["skill_id"],
"skill_applied": bool(jr["skill_applied"]),
"note": jr["note"],
}
for jr in judgment_rows
],
}
@_db_retry()
def get_top_skills(
self,
n: int = 10,
metric: str = "effective_rate",
min_selections: int = 1,
*,
active_only: bool = True,
) -> List[Dict[str, Any]]:
"""Top-N skills ranked by the chosen metric.
Metrics:
``effective_rate`` β€” completions / selections
``applied_rate`` β€” applied / selections
``completion_rate`` β€” completions / applied
``total_selections``β€” raw count
"""
rate_exprs = {
"effective_rate": (
"CAST(total_completions AS REAL) / total_selections"
),
"applied_rate": (
"CAST(total_applied AS REAL) / total_selections"
),
"completion_rate": (
"CASE WHEN total_applied > 0 "
"THEN CAST(total_completions AS REAL) / total_applied "
"ELSE 0.0 END"
),
"total_selections": "total_selections",
}
expr = rate_exprs.get(metric, rate_exprs["effective_rate"])
active_clause = " AND is_active=1" if active_only else ""
with self._reader() as conn:
rows = conn.execute(
f"SELECT *, ({expr}) AS _rank "
f"FROM skill_records "
f"WHERE total_selections >= ?{active_clause} "
f"ORDER BY _rank DESC LIMIT ?",
(min_selections, n),
).fetchall()
results = []
for r in rows:
d = dict(r)
d.pop("_rank", None)
results.append(d)
return results
@_db_retry()
def get_count_and_timestamp(
self, *, active_only: bool = True
) -> Dict[str, Any]:
"""Skill count + newest ``last_updated`` for cheap change detection."""
with self._reader() as conn:
where = " WHERE is_active=1" if active_only else ""
row = conn.execute(
f"SELECT COUNT(*) AS cnt, MAX(last_updated) AS max_ts "
f"FROM skill_records{where}"
).fetchone()
return {
"count": row["cnt"] if row else 0,
"max_last_updated": row["max_ts"] if row else None,
}
# Lineage / Ancestry
@_db_retry()
def get_ancestry(
self, skill_id: str, max_depth: int = 10
) -> List[SkillRecord]:
"""Walk up the lineage tree; returns ancestors oldest-first."""
with self._reader() as conn:
visited: set[str] = set()
ancestors: List[SkillRecord] = []
frontier = [skill_id]
for _ in range(max_depth):
next_frontier: List[str] = []
for sid in frontier:
for pr in conn.execute(
"SELECT parent_skill_id "
"FROM skill_lineage_parents WHERE skill_id=?",
(sid,),
).fetchall():
pid = pr["parent_skill_id"]
if pid in visited:
continue
visited.add(pid)
row = conn.execute(
"SELECT * FROM skill_records WHERE skill_id=?",
(pid,),
).fetchone()
if row:
ancestors.append(self._to_record(conn, row))
next_frontier.append(pid)
frontier = next_frontier
if not frontier:
break
ancestors.sort(key=lambda r: r.lineage.generation)
return ancestors
@_db_retry()
def get_lineage_tree(
self, skill_id: str, max_depth: int = 5
) -> Dict[str, Any]:
"""Build a JSON-friendly tree rooted at *skill_id* (downward)."""
with self._reader() as conn:
return self._subtree(conn, skill_id, max_depth, set())
def _subtree(
self,
conn: sqlite3.Connection,
sid: str,
depth: int,
visited: set,
) -> Dict[str, Any]:
visited.add(sid)
row = conn.execute(
"SELECT skill_id, name, lineage_generation, lineage_origin, is_active "
"FROM skill_records WHERE skill_id=?",
(sid,),
).fetchone()
node: Dict[str, Any] = {
"skill_id": sid,
"name": row["name"] if row else "?",
"generation": row["lineage_generation"] if row else -1,
"origin": row["lineage_origin"] if row else "unknown",
"is_active": bool(row["is_active"]) if row else False,
"children": [],
}
if depth <= 0:
return node
for cr in conn.execute(
"SELECT skill_id FROM skill_lineage_parents "
"WHERE parent_skill_id=?",
(sid,),
).fetchall():
cid = cr["skill_id"]
if cid not in visited:
node["children"].append(
self._subtree(conn, cid, depth - 1, visited)
)
return node
# Maintenance
def clear(self) -> None:
"""Delete all data (keeps schema)."""
self._ensure_open()
with self._mu:
self._conn.execute("BEGIN")
try:
# CASCADE on skill_records cleans up: lineage_parents, tool_deps, tags
self._conn.execute("DELETE FROM skill_records")
# execution_analyses CASCADE cleans up skill_judgments
self._conn.execute("DELETE FROM execution_analyses")
self._conn.commit()
logger.info("SkillStore cleared")
except Exception:
self._conn.rollback()
raise
def vacuum(self) -> None:
"""Compact the database file."""
self._ensure_open()
with self._mu:
self._conn.execute("VACUUM")
# Internal: Upsert / Insert / Deserialize
def _upsert(self, record: SkillRecord) -> None:
"""Insert or update skill_records + sync related rows.
Called within a transaction holding ``self._mu``.
"""
lin = record.lineage
# content_snapshot is Dict[str, str]; store as JSON text
snapshot_json = json.dumps(
lin.content_snapshot, ensure_ascii=False
)
self._conn.execute(
"""
INSERT INTO skill_records (
skill_id, name, description, path, is_active, category,
visibility, creator_id,
lineage_origin, lineage_generation,
lineage_source_task_id, lineage_change_summary,
lineage_content_diff, lineage_content_snapshot,
lineage_created_at, lineage_created_by,
total_selections, total_applied,
total_completions, total_fallbacks,
first_seen, last_updated
) VALUES (?,?,?,?,?,?, ?,?, ?,?, ?,?, ?,?, ?,?, ?,?,?,?, ?,?)
ON CONFLICT(skill_id) DO UPDATE SET
name=excluded.name,
description=excluded.description,
path=excluded.path,
is_active=excluded.is_active,
category=excluded.category,
visibility=excluded.visibility,
creator_id=excluded.creator_id,
lineage_origin=excluded.lineage_origin,
lineage_generation=excluded.lineage_generation,
lineage_source_task_id=excluded.lineage_source_task_id,
lineage_change_summary=excluded.lineage_change_summary,
lineage_content_diff=excluded.lineage_content_diff,
lineage_content_snapshot=excluded.lineage_content_snapshot,
lineage_created_at=excluded.lineage_created_at,
lineage_created_by=excluded.lineage_created_by,
total_selections=excluded.total_selections,
total_applied=excluded.total_applied,
total_completions=excluded.total_completions,
total_fallbacks=excluded.total_fallbacks,
last_updated=excluded.last_updated
""",
(
record.skill_id,
record.name,
record.description,
record.path,
int(record.is_active),
record.category.value,
record.visibility.value,
record.creator_id,
lin.origin.value,
lin.generation,
lin.source_task_id,
lin.change_summary,
lin.content_diff,
snapshot_json,
lin.created_at.isoformat(),
lin.created_by,
record.total_selections,
record.total_applied,
record.total_completions,
record.total_fallbacks,
record.first_seen.isoformat(),
record.last_updated.isoformat(),
),
)
# Sync lineage parents
self._conn.execute(
"DELETE FROM skill_lineage_parents WHERE skill_id=?",
(record.skill_id,),
)
for pid in lin.parent_skill_ids:
self._conn.execute(
"INSERT INTO skill_lineage_parents"
"(skill_id, parent_skill_id) VALUES(?,?)",
(record.skill_id, pid),
)
# Sync tool dependencies
self._conn.execute(
"DELETE FROM skill_tool_deps WHERE skill_id=?",
(record.skill_id,),
)
critical_set = set(record.critical_tools)
for tk in record.tool_dependencies:
self._conn.execute(
"INSERT INTO skill_tool_deps"
"(skill_id, tool_key, critical) VALUES(?,?,?)",
(record.skill_id, tk, 1 if tk in critical_set else 0),
)
# Sync tags
self._conn.execute(
"DELETE FROM skill_tags WHERE skill_id=?",
(record.skill_id,),
)
for tag in record.tags:
self._conn.execute(
"INSERT INTO skill_tags(skill_id, tag) VALUES(?,?)",
(record.skill_id, tag),
)
# Sync analyses (insert only NEW ones, dedup by task_id)
for a in record.recent_analyses:
existing = self._conn.execute(
"SELECT id FROM execution_analyses WHERE task_id=?",
(a.task_id,),
).fetchone()
if existing is None:
self._insert_analysis(a)
def _insert_analysis(self, a: ExecutionAnalysis) -> int:
"""Insert an execution_analyses row + its skill_judgments.
Called within a transaction holding ``self._mu``.
Returns:
int: The ``execution_analyses.id`` of the newly inserted row.
"""
cur = self._conn.execute(
"""
INSERT INTO execution_analyses (
task_id, timestamp,
task_completed, execution_note,
tool_issues, candidate_for_evolution,
evolution_suggestions, analyzed_by, analyzed_at
) VALUES (?,?, ?,?, ?,?, ?,?,?)
""",
(
a.task_id,
a.timestamp.isoformat(),
int(a.task_completed),
a.execution_note,
json.dumps(a.tool_issues, ensure_ascii=False),
int(a.candidate_for_evolution),
json.dumps(
[s.to_dict() for s in a.evolution_suggestions],
ensure_ascii=False,
),
a.analyzed_by,
a.analyzed_at.isoformat(),
),
)
analysis_id = cur.lastrowid
for j in a.skill_judgments:
self._conn.execute(
"INSERT INTO skill_judgments "
"(analysis_id, skill_id, skill_applied, note) "
"VALUES (?,?,?,?)",
(analysis_id, j.skill_id, int(j.skill_applied), j.note),
)
return analysis_id
# Deserialization
def _to_records_bulk(
self, conn: sqlite3.Connection, rows: List[sqlite3.Row]
) -> List[SkillRecord]:
"""Bulk deserialize skill_records rows β†’ List[SkillRecord]."""
if not rows:
return []
sids = [r["skill_id"] for r in rows]
parents_map = {sid: [] for sid in sids}
deps_map = {sid: [] for sid in sids}
tags_map = {sid: [] for sid in sids}
analyses_map = {sid: [] for sid in sids}
def chunks(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
for chunk in chunks(sids, 900):
placeholders = ",".join(["?"] * len(chunk))
chunk_tuple = tuple(chunk)
# Parents
p_rows = conn.execute(
f"SELECT skill_id, parent_skill_id FROM skill_lineage_parents WHERE skill_id IN ({placeholders})",
chunk_tuple,
).fetchall()
for pr in p_rows:
parents_map[pr["skill_id"]].append(pr["parent_skill_id"])
# Tool deps
d_rows = conn.execute(
f"SELECT skill_id, tool_key, critical FROM skill_tool_deps WHERE skill_id IN ({placeholders})",
chunk_tuple,
).fetchall()
for dr in d_rows:
deps_map[dr["skill_id"]].append(dr)
# Tags
t_rows = conn.execute(
f"SELECT skill_id, tag FROM skill_tags WHERE skill_id IN ({placeholders})",
chunk_tuple,
).fetchall()
for tr in t_rows:
tags_map[tr["skill_id"]].append(tr["tag"])
# Analyses
a_rows = conn.execute(
f"""
SELECT ea.*, sj.skill_id as sj_skill_id
FROM execution_analyses ea
JOIN skill_judgments sj ON ea.id = sj.analysis_id
WHERE sj.skill_id IN ({placeholders})
ORDER BY ea.timestamp DESC
""",
chunk_tuple,
).fetchall()
for ar in a_rows:
sid = ar["sj_skill_id"]
if len(analyses_map[sid]) < SkillRecord.MAX_RECENT:
analyses_map[sid].append(self._to_analysis(conn, ar))
result = []
for row in rows:
sid = row["skill_id"]
raw_snapshot = row["lineage_content_snapshot"] or "{}"
snapshot: Dict[str, str] = json.loads(raw_snapshot)
lineage = SkillLineage(
origin=SkillOrigin(row["lineage_origin"]),
generation=row["lineage_generation"],
parent_skill_ids=parents_map[sid],
source_task_id=row["lineage_source_task_id"],
change_summary=row["lineage_change_summary"],
content_diff=row["lineage_content_diff"],
content_snapshot=snapshot,
created_at=datetime.fromisoformat(row["lineage_created_at"]),
created_by=row["lineage_created_by"],
)
deps = deps_map[sid]
record = SkillRecord(
skill_id=sid,
name=row["name"],
description=row["description"],
path=row["path"],
is_active=bool(row["is_active"]),
category=SkillCategory(row["category"]),
tags=tags_map[sid],
visibility=(
SkillVisibility(row["visibility"])
if row["visibility"] else SkillVisibility.PRIVATE
),
creator_id=row["creator_id"] or "",
lineage=lineage,
tool_dependencies=[r["tool_key"] for r in deps],
critical_tools=[
r["tool_key"] for r in deps if r["critical"]
],
total_selections=row["total_selections"],
total_applied=row["total_applied"],
total_completions=row["total_completions"],
total_fallbacks=row["total_fallbacks"],
recent_analyses=analyses_map[sid],
first_seen=datetime.fromisoformat(row["first_seen"]),
last_updated=datetime.fromisoformat(row["last_updated"]),
)
result.append(record)
return result
def _to_record(
self, conn: sqlite3.Connection, row: sqlite3.Row
) -> SkillRecord:
"""Deserialize a skill_records row + related rows β†’ SkillRecord."""
sid = row["skill_id"]
parents = [
r["parent_skill_id"]
for r in conn.execute(
"SELECT parent_skill_id "
"FROM skill_lineage_parents WHERE skill_id=?",
(sid,),
).fetchall()
]
# Deserialize content_snapshot: stored as JSON dict
# mapping relative file paths to their text content
raw_snapshot = row["lineage_content_snapshot"] or "{}"
snapshot: Dict[str, str] = json.loads(raw_snapshot)
lineage = SkillLineage(
origin=SkillOrigin(row["lineage_origin"]),
generation=row["lineage_generation"],
parent_skill_ids=parents,
source_task_id=row["lineage_source_task_id"],
change_summary=row["lineage_change_summary"],
content_diff=row["lineage_content_diff"],
content_snapshot=snapshot,
created_at=datetime.fromisoformat(row["lineage_created_at"]),
created_by=row["lineage_created_by"],
)
dep_rows = conn.execute(
"SELECT tool_key, critical "
"FROM skill_tool_deps WHERE skill_id=?",
(sid,),
).fetchall()
tag_rows = conn.execute(
"SELECT tag FROM skill_tags WHERE skill_id=?", (sid,)
).fetchall()
# Load recent analyses involving this skill (via skill_judgments).
# skill_judgments.skill_id stores the true skill_id (same as DB PK).
analysis_rows = conn.execute(
"SELECT ea.* FROM execution_analyses ea "
"JOIN skill_judgments sj ON ea.id = sj.analysis_id "
"WHERE sj.skill_id = ? "
"ORDER BY ea.timestamp DESC LIMIT ?",
(sid, SkillRecord.MAX_RECENT),
).fetchall()
return SkillRecord(
skill_id=sid,
name=row["name"],
description=row["description"],
path=row["path"],
is_active=bool(row["is_active"]),
category=SkillCategory(row["category"]),
tags=[r["tag"] for r in tag_rows],
visibility=(
SkillVisibility(row["visibility"])
if row["visibility"] else SkillVisibility.PRIVATE
),
creator_id=row["creator_id"] or "",
lineage=lineage,
tool_dependencies=[r["tool_key"] for r in dep_rows],
critical_tools=[
r["tool_key"] for r in dep_rows if r["critical"]
],
total_selections=row["total_selections"],
total_applied=row["total_applied"],
total_completions=row["total_completions"],
total_fallbacks=row["total_fallbacks"],
recent_analyses=[
self._to_analysis(conn, r) for r in reversed(analysis_rows)
],
first_seen=datetime.fromisoformat(row["first_seen"]),
last_updated=datetime.fromisoformat(row["last_updated"]),
)
@staticmethod
def _to_analysis(
conn: sqlite3.Connection, row: sqlite3.Row
) -> ExecutionAnalysis:
"""Deserialize an execution_analyses row + judgments β†’ ExecutionAnalysis."""
analysis_id = row["id"]
judgment_rows = conn.execute(
"SELECT skill_id, skill_applied, note "
"FROM skill_judgments WHERE analysis_id=?",
(analysis_id,),
).fetchall()
suggestions: list[EvolutionSuggestion] = []
raw_suggestions = row["evolution_suggestions"]
if raw_suggestions:
try:
suggestions = [
EvolutionSuggestion.from_dict(s)
for s in json.loads(raw_suggestions)
]
except (json.JSONDecodeError, KeyError, ValueError):
pass
return ExecutionAnalysis(
task_id=row["task_id"],
timestamp=datetime.fromisoformat(row["timestamp"]),
task_completed=bool(row["task_completed"]),
execution_note=row["execution_note"],
tool_issues=json.loads(row["tool_issues"]),
skill_judgments=[
SkillJudgment(
skill_id=jr["skill_id"],
skill_applied=bool(jr["skill_applied"]),
note=jr["note"],
)
for jr in judgment_rows
],
evolution_suggestions=suggestions,
analyzed_by=row["analyzed_by"],
analyzed_at=datetime.fromisoformat(row["analyzed_at"]),
)