| | import os
|
| | import sqlite3
|
| | from contextlib import contextmanager
|
| | from datetime import datetime
|
| | from pathlib import Path
|
| | from statistics import median
|
| | from typing import Optional, Dict, Any, List, Tuple
|
| |
|
| |
|
| |
|
| | DEFAULT_DB_PATH = None
|
| |
|
| |
|
| | USE_BLOCKCHAIN_FOR_EVENTS = False
|
| |
|
| |
|
| | FEEDBACK_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "feedback.db"
|
| |
|
| |
|
| | CAPTIONS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "captions.db"
|
| |
|
| |
|
| | VIDEOS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "videos.db"
|
| |
|
| |
|
| | AUDIODESCRIPTIONS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "audiodescriptions.db"
|
| |
|
| |
|
| | ACTIONS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "actions.db"
|
| |
|
| |
|
| | CASTING_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "casting.db"
|
| | SCENARIOS_DB_PATH = Path(__file__).resolve().parent / "temp" / "db" / "scenarios.db"
|
| |
|
| |
|
| | def set_db_path(db_path: str):
|
| | global DEFAULT_DB_PATH
|
| | DEFAULT_DB_PATH = db_path
|
| | os.makedirs(os.path.dirname(db_path), exist_ok=True)
|
| |
|
| |
|
| | def set_blockchain_enabled(enabled: bool) -> None:
|
| | """Activa o desactiva l'ús de blockchain per registrar esdeveniments.
|
| |
|
| | Quan està desactivat (per defecte), els esdeveniments es registren a
|
| | demo/temp/events.db. Quan està activat, s'envien a aws_qldb.
|
| | """
|
| |
|
| | global USE_BLOCKCHAIN_FOR_EVENTS
|
| | USE_BLOCKCHAIN_FOR_EVENTS = bool(enabled)
|
| |
|
| |
|
| | def get_connection():
|
| | if not DEFAULT_DB_PATH:
|
| | raise ValueError("Database path not set. Call set_db_path(path) first.")
|
| | return sqlite3.connect(DEFAULT_DB_PATH)
|
| |
|
| |
|
| | @contextmanager
|
| | def get_conn(db_path: Optional[str] = None):
|
| | path = db_path or DEFAULT_DB_PATH
|
| | conn = sqlite3.connect(path, check_same_thread=False)
|
| | conn.row_factory = sqlite3.Row
|
| | try:
|
| | yield conn
|
| | conn.commit()
|
| | finally:
|
| | conn.close()
|
| |
|
| |
|
| | def init_schema():
|
| | with get_conn() as conn:
|
| | c = conn.cursor()
|
| |
|
| | c.execute(
|
| | """
|
| | CREATE TABLE IF NOT EXISTS users (
|
| | id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| | username TEXT UNIQUE NOT NULL,
|
| | password_hash TEXT,
|
| | role TEXT NOT NULL,
|
| | created_at TEXT NOT NULL
|
| | );
|
| | """
|
| | )
|
| |
|
| | try:
|
| | c.execute("PRAGMA table_info(users)")
|
| | cols = {row[1] for row in c.fetchall()}
|
| | if "password_hash" not in cols:
|
| | c.execute("ALTER TABLE users ADD COLUMN password_hash TEXT")
|
| | if "role" not in cols:
|
| | c.execute("ALTER TABLE users ADD COLUMN role TEXT NOT NULL DEFAULT 'verd'")
|
| | if "created_at" not in cols:
|
| | c.execute("ALTER TABLE users ADD COLUMN created_at TEXT NOT NULL DEFAULT ''")
|
| | except sqlite3.OperationalError:
|
| | pass
|
| |
|
| | try:
|
| | c.execute("ALTER TABLE users DROP COLUMN pw_hash;")
|
| | except sqlite3.OperationalError:
|
| | pass
|
| |
|
| |
|
| |
|
| | c.execute(
|
| | """
|
| | CREATE TABLE IF NOT EXISTS feedback_ad (
|
| | id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| | video_name TEXT NOT NULL, -- nombre de carpeta dentro de videos/completed
|
| | user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
| | transcripcio INTEGER NOT NULL, -- 1..10
|
| | identificacio INTEGER NOT NULL, -- 1..10
|
| | localitzacions INTEGER NOT NULL, -- 1..10
|
| | activitats INTEGER NOT NULL, -- 1..10
|
| | narracions INTEGER NOT NULL, -- 1..10
|
| | expressivitat INTEGER NOT NULL, -- 1..10
|
| | comments TEXT,
|
| | created_at TEXT NOT NULL
|
| | );
|
| | """
|
| | )
|
| |
|
| | try:
|
| | c.execute(
|
| | "ALTER TABLE feedback_ad ADD COLUMN expressivitat INTEGER NOT NULL DEFAULT 7;"
|
| | )
|
| | except sqlite3.OperationalError:
|
| | pass
|
| |
|
| |
|
| | def add_feedback_ad(
|
| | video_name: str,
|
| | user_id: int,
|
| | transcripcio: int,
|
| | identificacio: int,
|
| | localitzacions: int,
|
| | activitats: int,
|
| | narracions: int,
|
| | expressivitat: int,
|
| | comments: str | None,
|
| | ):
|
| | with get_conn() as conn:
|
| | conn.execute(
|
| | """INSERT INTO feedback_ad
|
| | (video_name, user_id, transcripcio, identificacio, localitzacions, activitats, narracions, expressivitat, comments, created_at)
|
| | VALUES (?,?,?,?,?,?,?,?,?,?)""",
|
| | (
|
| | video_name,
|
| | user_id,
|
| | transcripcio,
|
| | identificacio,
|
| | localitzacions,
|
| | activitats,
|
| | narracions,
|
| | expressivitat,
|
| | comments,
|
| | now_str(),
|
| | ),
|
| | )
|
| |
|
| |
|
| | def get_feedback_ad_for_video(video_name: str):
|
| | with get_conn() as conn:
|
| | cur = conn.execute(
|
| | """SELECT * FROM feedback_ad WHERE video_name=? ORDER BY created_at DESC""",
|
| | (video_name,),
|
| | )
|
| | return cur.fetchall()
|
| |
|
| |
|
| | def get_accessible_videos_for_session(session_id: str | None) -> List[str]:
|
| | """Retorna els noms de vídeo accessibles per a una sessió.
|
| |
|
| | Regles:
|
| | - Sempre inclou vídeos amb visibility='public' a videos.db.
|
| | - Afegeix vídeos per als quals el camp owner coincideix amb algun phone
|
| | registrat a events.db per a la mateixa session.
|
| |
|
| | Args:
|
| | session_id: Identificador de sessió (st.session_state.session_id).
|
| | """
|
| |
|
| |
|
| | public_videos: set[str] = set()
|
| | with _connect_videos_db() as vconn:
|
| | try:
|
| | for row in vconn.execute(
|
| | "SELECT DISTINCT video_name FROM videos WHERE visibility = 'public'"
|
| | ):
|
| | public_videos.add(row["video_name"])
|
| | except sqlite3.OperationalError:
|
| |
|
| | return []
|
| |
|
| | if not session_id:
|
| | return sorted(public_videos)
|
| |
|
| |
|
| | phones: set[str] = set()
|
| | with _connect_actions_db() as aconn:
|
| | for row in aconn.execute(
|
| | "SELECT DISTINCT phone FROM actions WHERE session = ? AND phone IS NOT NULL AND phone != ''",
|
| | (session_id,),
|
| | ):
|
| | phones.add(row["phone"])
|
| |
|
| | if not phones:
|
| | return sorted(public_videos)
|
| |
|
| |
|
| | owner_videos: set[str] = set()
|
| | with _connect_videos_db() as vconn:
|
| | q_marks = ",".join("?" for _ in phones)
|
| | params: Tuple[Any, ...] = tuple(phones)
|
| | query = (
|
| | f"SELECT DISTINCT video_name FROM videos WHERE owner IN ({q_marks})"
|
| | )
|
| | for row in vconn.execute(query, params):
|
| | owner_videos.add(row["video_name"])
|
| |
|
| | all_videos = public_videos | owner_videos
|
| | return sorted(all_videos)
|
| |
|
| |
|
| | def get_accessible_videos_with_sha1(session_id: str | None) -> List[Dict[str, Any]]:
|
| | """Retorna vídeos accessibles amb el seu sha1sum des de demo/temp/videos.db.
|
| |
|
| | Aplica les mateixes regles que get_accessible_videos_for_session, però
|
| | retorna diccionaris amb almenys les claus: video_name i sha1sum.
|
| | """
|
| |
|
| |
|
| | public_rows: Dict[str, Dict[str, Any]] = {}
|
| | with _connect_videos_db() as vconn:
|
| | try:
|
| | for row in vconn.execute(
|
| | "SELECT video_name, sha1sum FROM videos WHERE visibility = 'public'"
|
| | ):
|
| | key = row["video_name"] or row["sha1sum"]
|
| | public_rows[key] = {"video_name": row["video_name"], "sha1sum": row["sha1sum"]}
|
| | except sqlite3.OperationalError:
|
| |
|
| | return []
|
| |
|
| | if not session_id:
|
| |
|
| | return sorted(public_rows.values(), key=lambda r: (r["video_name"] or r["sha1sum"]))
|
| |
|
| |
|
| | phones: set[str] = set()
|
| | with _connect_actions_db() as aconn:
|
| | for row in aconn.execute(
|
| | "SELECT DISTINCT phone FROM actions WHERE session = ? AND phone IS NOT NULL AND phone != ''",
|
| | (session_id,),
|
| | ):
|
| | phones.add(row["phone"])
|
| |
|
| | if not phones:
|
| | return sorted(public_rows.values(), key=lambda r: (r["video_name"] or r["sha1sum"]))
|
| |
|
| |
|
| | owner_rows: Dict[str, Dict[str, Any]] = {}
|
| | with _connect_videos_db() as vconn:
|
| | q_marks = ",".join("?" for _ in phones)
|
| | params: Tuple[Any, ...] = tuple(phones)
|
| | query = f"SELECT video_name, sha1sum FROM videos WHERE owner IN ({q_marks})"
|
| | for row in vconn.execute(query, params):
|
| | key = row["video_name"] or row["sha1sum"]
|
| | owner_rows[key] = {"video_name": row["video_name"], "sha1sum": row["sha1sum"]}
|
| |
|
| | all_rows: Dict[str, Dict[str, Any]] = {**public_rows, **owner_rows}
|
| | return sorted(all_rows.values(), key=lambda r: (r["video_name"] or r["sha1sum"]))
|
| |
|
| |
|
| | def _connect_feedback_db() -> sqlite3.Connection:
|
| | """Connexió directa a demo/data/feedback.db.
|
| |
|
| | És independent de DEFAULT_DB_PATH perquè aquesta BD és específica de feedback
|
| | agregat importat des de engine.
|
| | """
|
| |
|
| | FEEDBACK_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
| | conn = sqlite3.connect(str(FEEDBACK_DB_PATH))
|
| | conn.row_factory = sqlite3.Row
|
| | return conn
|
| |
|
| |
|
| | def _connect_actions_db() -> sqlite3.Connection:
|
| | ACTIONS_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
| | conn = sqlite3.connect(str(ACTIONS_DB_PATH))
|
| | conn.row_factory = sqlite3.Row
|
| | return conn
|
| |
|
| |
|
| | def get_latest_user_phone_for_session(session_id: str) -> Tuple[str, str]:
|
| | if not session_id:
|
| | return "", ""
|
| |
|
| | try:
|
| | with _connect_actions_db() as conn:
|
| | cur = conn.execute(
|
| | "SELECT user, phone FROM actions "
|
| | "WHERE session = ? AND (user IS NOT NULL OR phone IS NOT NULL) "
|
| | "ORDER BY id DESC LIMIT 1",
|
| | (session_id,),
|
| | )
|
| | row = cur.fetchone()
|
| | if not row:
|
| | return "", ""
|
| | u = row["user"] if row["user"] is not None else ""
|
| | p = row["phone"] if row["phone"] is not None else ""
|
| | return str(u), str(p)
|
| | except sqlite3.OperationalError:
|
| | return "", ""
|
| |
|
| |
|
| | def insert_action(
|
| | *,
|
| | session: str,
|
| | user: str,
|
| | phone: str,
|
| | action: str,
|
| | sha1sum: str,
|
| | timestamp: Optional[str] = None,
|
| | ) -> None:
|
| | ts = timestamp or datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
| |
|
| | try:
|
| | with _connect_actions_db() as conn:
|
| | cur = conn.cursor()
|
| | cur.execute(
|
| | """
|
| | CREATE TABLE IF NOT EXISTS actions (
|
| | id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| | timestamp TEXT NOT NULL,
|
| | action TEXT NOT NULL,
|
| | session TEXT,
|
| | user TEXT,
|
| | phone TEXT,
|
| | sha1sum TEXT
|
| | );
|
| | """
|
| | )
|
| |
|
| | cur.execute(
|
| | """INSERT INTO actions
|
| | (timestamp, action, session, user, phone, sha1sum)
|
| | VALUES (?,?,?,?,?,?)""",
|
| | (ts, action, session or "", user or "", phone or "", sha1sum or ""),
|
| | )
|
| | except sqlite3.OperationalError:
|
| | return
|
| |
|
| |
|
| | def get_video_owner_by_sha1(sha1sum: str) -> str:
|
| | """Retorna el telèfon (owner) associat a un sha1sum a videos.db, o "".
|
| |
|
| | Cerca a demo/temp/db/videos.db una fila amb aquest sha1sum i retorna el
|
| | camp owner si existeix.
|
| | """
|
| |
|
| | if not sha1sum:
|
| | return ""
|
| |
|
| | try:
|
| | with _connect_videos_db() as conn:
|
| | cur = conn.execute(
|
| | "SELECT owner FROM videos WHERE sha1sum = ? LIMIT 1",
|
| | (sha1sum,),
|
| | )
|
| | row = cur.fetchone()
|
| | if not row:
|
| | return ""
|
| | owner = row["owner"] if "owner" in row.keys() else None
|
| | return str(owner or "")
|
| | except sqlite3.OperationalError:
|
| | return ""
|
| |
|
| |
|
| | def can_revoke_video_for_session(sha1sum: str, session_id: str) -> bool:
|
| | if not sha1sum or not session_id:
|
| | return False
|
| |
|
| | visibility_val = ""
|
| | owner_val = ""
|
| |
|
| | try:
|
| | with _connect_videos_db() as conn:
|
| | cur = conn.execute(
|
| | "SELECT visibility, owner FROM videos WHERE sha1sum = ? LIMIT 1",
|
| | (sha1sum,),
|
| | )
|
| | row = cur.fetchone()
|
| | if not row:
|
| | return False
|
| | visibility_val = str(row["visibility"] or "").strip().lower()
|
| | owner_val = str(row["owner"] or "").strip()
|
| | except sqlite3.OperationalError:
|
| | return False
|
| |
|
| | if visibility_val not in {"private", "privat"}:
|
| | return False
|
| |
|
| | phones: set[str] = set()
|
| | try:
|
| | with _connect_actions_db() as conn:
|
| | cur = conn.execute(
|
| | "SELECT DISTINCT phone FROM actions WHERE session = ? AND phone IS NOT NULL AND phone != ''",
|
| | (session_id,),
|
| | )
|
| | for row in cur.fetchall():
|
| | phones.add(str(row["phone"]))
|
| | except sqlite3.OperationalError:
|
| | return False
|
| |
|
| | if not phones:
|
| | return False
|
| |
|
| | return owner_val in phones
|
| |
|
| |
|
| | def is_video_input_ok(sha1sum: str) -> bool:
|
| | """Retorna True si el vídeo té status='input-OK' a videos.db per a aquest sha1sum."""
|
| |
|
| | if not sha1sum:
|
| | return False
|
| |
|
| | try:
|
| | with _connect_videos_db() as conn:
|
| | cur = conn.execute(
|
| | "SELECT status FROM videos WHERE sha1sum = ? LIMIT 1",
|
| | (sha1sum,),
|
| | )
|
| | row = cur.fetchone()
|
| | if not row:
|
| | return False
|
| | status = row["status"] if "status" in row.keys() else None
|
| | return str(status or "").strip().lower() == "input-ok"
|
| | except sqlite3.OperationalError:
|
| | return False
|
| |
|
| |
|
| | def ensure_video_row_for_upload(
|
| | *,
|
| | sha1sum: str,
|
| | video_name: str,
|
| | owner_phone: str,
|
| | status: str = "input-pending",
|
| | visibility: str | None = None,
|
| | ) -> None:
|
| | if not sha1sum:
|
| | return
|
| |
|
| | try:
|
| | with _connect_videos_db() as conn:
|
| | cur = conn.cursor()
|
| |
|
| | try:
|
| | cur.execute("PRAGMA table_info(videos)")
|
| | cols = {row[1] for row in cur.fetchall()}
|
| | except sqlite3.OperationalError:
|
| | return
|
| |
|
| | alter_stmts = []
|
| | if "owner" not in cols:
|
| | alter_stmts.append("ALTER TABLE videos ADD COLUMN owner TEXT")
|
| | if "status" not in cols:
|
| | alter_stmts.append("ALTER TABLE videos ADD COLUMN status TEXT")
|
| | if "sha1sum" not in cols:
|
| | alter_stmts.append("ALTER TABLE videos ADD COLUMN sha1sum TEXT")
|
| | if "visibility" not in cols:
|
| | alter_stmts.append("ALTER TABLE videos ADD COLUMN visibility TEXT")
|
| |
|
| | for stmt in alter_stmts:
|
| | try:
|
| | cur.execute(stmt)
|
| | except sqlite3.OperationalError:
|
| | continue
|
| |
|
| | row = cur.execute(
|
| | "SELECT id FROM videos WHERE sha1sum = ? LIMIT 1",
|
| | (sha1sum,),
|
| | ).fetchone()
|
| | if row is not None:
|
| | return
|
| |
|
| | vis = visibility or "private"
|
| | cur.execute(
|
| | "INSERT INTO videos (video_name, owner, visibility, sha1sum, status) "
|
| | "VALUES (?,?,?,?,?)",
|
| | (video_name or sha1sum, owner_phone or "", vis, sha1sum, status),
|
| | )
|
| | except sqlite3.OperationalError:
|
| | return
|
| |
|
| |
|
| | def update_video_status(sha1sum: str, status: str) -> None:
|
| | """Actualitza el camp status d'un vídeo existent a videos.db per sha1sum.
|
| |
|
| | Si la taula o el registre no existeixen, no fa res.
|
| | """
|
| |
|
| | if not sha1sum:
|
| | return
|
| |
|
| | try:
|
| | with _connect_videos_db() as conn:
|
| | cur = conn.cursor()
|
| |
|
| | try:
|
| | cur.execute("PRAGMA table_info(videos)")
|
| | cols = {row[1] for row in cur.fetchall()}
|
| | except sqlite3.OperationalError:
|
| | return
|
| |
|
| |
|
| | alter_stmts: list[str] = []
|
| | if "status" not in cols:
|
| | alter_stmts.append("ALTER TABLE videos ADD COLUMN status TEXT")
|
| | if "sha1sum" not in cols:
|
| | alter_stmts.append("ALTER TABLE videos ADD COLUMN sha1sum TEXT")
|
| |
|
| | for stmt in alter_stmts:
|
| | try:
|
| | cur.execute(stmt)
|
| | except sqlite3.OperationalError:
|
| | continue
|
| |
|
| | cur.execute(
|
| | "UPDATE videos SET status = ? WHERE sha1sum = ?",
|
| | (status, sha1sum),
|
| | )
|
| | except sqlite3.OperationalError:
|
| | return
|
| |
|
| |
|
| | def get_videos_by_status(status: str) -> List[Dict[str, Any]]:
|
| | """Retorna llista de vídeos a videos.db amb un status concret.
|
| |
|
| | Cada element és un dict amb com a mínim: sha1sum, video_name.
|
| | Si la taula o les columnes no existeixen, retorna [].
|
| | """
|
| |
|
| | if not status:
|
| | return []
|
| |
|
| | try:
|
| | with _connect_videos_db() as conn:
|
| | cur = conn.cursor()
|
| | try:
|
| | cur.execute("PRAGMA table_info(videos)")
|
| | cols = {row[1] for row in cur.fetchall()}
|
| | except sqlite3.OperationalError:
|
| | return []
|
| |
|
| | if "status" not in cols or "sha1sum" not in cols:
|
| | return []
|
| |
|
| |
|
| | has_video_name = "video_name" in cols
|
| | select_sql = (
|
| | "SELECT sha1sum, video_name FROM videos WHERE status = ?"
|
| | if has_video_name
|
| | else "SELECT sha1sum, sha1sum AS video_name FROM videos WHERE status = ?"
|
| | )
|
| |
|
| | results: List[Dict[str, Any]] = []
|
| | for row in cur.execute(select_sql, (status,)):
|
| | sha1 = str(row[0]) if row[0] is not None else ""
|
| | vname = str(row[1]) if row[1] is not None else sha1
|
| | if not sha1:
|
| | continue
|
| | results.append({"sha1sum": sha1, "video_name": vname})
|
| | return results
|
| | except sqlite3.OperationalError:
|
| | return []
|
| |
|
| |
|
| | def _connect_audiodescriptions_db() -> sqlite3.Connection:
|
| | """Connexió directa a demo/temp/audiodescriptions.db.
|
| |
|
| | Es fa servir per llegir i actualitzar AD (une_ad, free_ad, eval, mètriques, etc.).
|
| | """
|
| |
|
| | AUDIODESCRIPTIONS_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
| | conn = sqlite3.connect(str(AUDIODESCRIPTIONS_DB_PATH))
|
| | conn.row_factory = sqlite3.Row
|
| | return conn
|
| |
|
| |
|
| | def get_audiodescription(sha1sum: str, version: str) -> Optional[sqlite3.Row]:
|
| | """Retorna una fila d'audiodescriptions per sha1sum i version, o None.
|
| |
|
| | Si la taula no existeix, retorna None silenciosament.
|
| | """
|
| |
|
| | try:
|
| | with _connect_audiodescriptions_db() as conn:
|
| | cur = conn.execute(
|
| | "SELECT * FROM audiodescriptions WHERE sha1sum=? AND version=?",
|
| | (sha1sum, version),
|
| | )
|
| | return cur.fetchone()
|
| | except sqlite3.OperationalError:
|
| |
|
| | return None
|
| |
|
| |
|
| | def get_videos_from_audiodescriptions(
|
| | session_id: str | None = None,
|
| | role: str | None = None,
|
| | ) -> List[Dict[str, Any]]:
|
| | """Retorna vídeos disponibles segons audiodescriptions.db i videos.db.
|
| |
|
| | 1) Llegeix demo/temp/db/audiodescriptions.db i obté els DISTINCT sha1sum.
|
| | 2) Per a cada sha1sum, mira a demo/temp/db/videos.db quin és el video_name.
|
| | Si no el troba, fa servir el sha1sum com a fallback.
|
| | """
|
| |
|
| | sha1_list: List[str] = []
|
| | try:
|
| | with _connect_audiodescriptions_db() as conn:
|
| | cur = conn.cursor()
|
| | cur.execute("SELECT DISTINCT sha1sum FROM audiodescriptions")
|
| | rows = cur.fetchall()
|
| | sha1_list = [str(r["sha1sum"]) for r in rows if r["sha1sum"]]
|
| | except sqlite3.OperationalError:
|
| |
|
| | return []
|
| |
|
| | if not sha1_list:
|
| | return []
|
| |
|
| |
|
| | session_phone: str = ""
|
| | if session_id:
|
| | try:
|
| | _, session_phone = get_latest_user_phone_for_session(session_id)
|
| | except sqlite3.OperationalError:
|
| | session_phone = ""
|
| |
|
| |
|
| | result: List[Dict[str, Any]] = []
|
| | try:
|
| | with _connect_videos_db() as vconn:
|
| | vcur = vconn.cursor()
|
| | for sha1 in sha1_list:
|
| | try:
|
| | row = vcur.execute(
|
| | "SELECT video_name, visibility, owner FROM videos WHERE sha1sum = ? LIMIT 1",
|
| | (sha1,),
|
| | ).fetchone()
|
| | if row is None:
|
| |
|
| | continue
|
| |
|
| | vname = str(row["video_name"]) if row["video_name"] else ""
|
| | visibility_val = str(row["visibility"] or "").strip().lower()
|
| | owner_val = str(row["owner"] or "").strip()
|
| |
|
| | if not vname:
|
| |
|
| | continue
|
| | except sqlite3.OperationalError:
|
| |
|
| | continue
|
| |
|
| |
|
| | if vname == sha1:
|
| | continue
|
| |
|
| |
|
| | is_public = visibility_val == "public"
|
| | is_private = visibility_val == "private" or not visibility_val
|
| |
|
| | allowed = False
|
| | if is_public:
|
| | allowed = True
|
| | else:
|
| |
|
| | if role in {"verd", "blau"}:
|
| |
|
| | allowed = True
|
| | elif role == "groc":
|
| |
|
| | if session_phone and owner_val and session_phone == owner_val:
|
| | allowed = True
|
| | else:
|
| |
|
| | allowed = False
|
| |
|
| | if not allowed:
|
| | continue
|
| |
|
| | result.append({"sha1sum": sha1, "video_name": vname})
|
| | except sqlite3.OperationalError:
|
| |
|
| | result = []
|
| |
|
| | return result
|
| |
|
| |
|
| | def get_audiodescription_history(sha1sum: str, version: str) -> list[sqlite3.Row]:
|
| | """Retorna només la darrera fila d'audiodescriptions per sha1sum+version.
|
| |
|
| | Abans es feien servir múltiples files per representar diferents etapes
|
| | (original, HITL OK, HITL Test). Ara aquests estats viuen en diferents
|
| | columnes d'un únic registre, de manera que només ens interessa l'última
|
| | versió desada per a (sha1sum, version).
|
| | """
|
| |
|
| | try:
|
| | sql = (
|
| | "SELECT * FROM audiodescriptions "
|
| | "WHERE sha1sum = ? AND LOWER(version) = LOWER(?) "
|
| | "ORDER BY rowid DESC "
|
| | "LIMIT 1"
|
| | )
|
| |
|
| | with _connect_audiodescriptions_db() as conn:
|
| | row = conn.execute(sql, (sha1sum, version)).fetchone()
|
| | return [row] if row is not None else []
|
| | except sqlite3.OperationalError:
|
| | return []
|
| |
|
| |
|
| | def update_audiodescription_text(
|
| | sha1sum: str,
|
| | version: str,
|
| | *,
|
| | free_ad: Optional[str] = None,
|
| | une_ad: Optional[str] = None,
|
| | ok_free_ad: Optional[str] = None,
|
| | ok_une_ad: Optional[str] = None,
|
| | test_free_ad: Optional[str] = None,
|
| | test_une_ad: Optional[str] = None,
|
| | ) -> None:
|
| | """Actualitza camps de text per a una AD existent.
|
| |
|
| | Si no existeix el registre, no fa res (per simplicitat).
|
| | """
|
| |
|
| | try:
|
| | with _connect_audiodescriptions_db() as conn:
|
| | row = conn.execute(
|
| | "SELECT 1 FROM audiodescriptions WHERE sha1sum=? AND version=?",
|
| | (sha1sum, version),
|
| | ).fetchone()
|
| | if not row:
|
| | return
|
| |
|
| | fields = []
|
| | values: List[Any] = []
|
| |
|
| |
|
| | if free_ad is not None:
|
| | fields.append("free_ad = ?")
|
| | values.append(free_ad)
|
| | if une_ad is not None:
|
| | fields.append("une_ad = ?")
|
| | values.append(une_ad)
|
| |
|
| |
|
| | if ok_free_ad is not None:
|
| | fields.append("ok_free_ad = ?")
|
| | values.append(ok_free_ad)
|
| | if ok_une_ad is not None:
|
| | fields.append("ok_une_ad = ?")
|
| | values.append(ok_une_ad)
|
| | if test_free_ad is not None:
|
| | fields.append("test_free_ad = ?")
|
| | values.append(test_free_ad)
|
| | if test_une_ad is not None:
|
| | fields.append("test_une_ad = ?")
|
| | values.append(test_une_ad)
|
| |
|
| | if not fields:
|
| | return
|
| |
|
| | values.extend([sha1sum, version])
|
| | sql = f"UPDATE audiodescriptions SET {', '.join(fields)} WHERE sha1sum=? AND version=?"
|
| | conn.execute(sql, tuple(values))
|
| | except sqlite3.OperationalError:
|
| |
|
| | return
|
| |
|
| |
|
| | def update_audiodescription_info_ad(
|
| | sha1sum: str,
|
| | version: str,
|
| | info_ad: str,
|
| | ) -> None:
|
| | """Actualitza el camp info_ad per a una AD existent.
|
| |
|
| | Si la taula o el registre no existeixen, no fa res.
|
| | """
|
| |
|
| | try:
|
| | with _connect_audiodescriptions_db() as conn:
|
| | row = conn.execute(
|
| | "SELECT 1 FROM audiodescriptions WHERE sha1sum=? AND version=?",
|
| | (sha1sum, version),
|
| | ).fetchone()
|
| | if not row:
|
| | return
|
| |
|
| | conn.execute(
|
| | "UPDATE audiodescriptions SET info_ad = ? WHERE sha1sum=? AND version=?",
|
| | (info_ad, sha1sum, version),
|
| | )
|
| | except sqlite3.OperationalError:
|
| | return
|
| |
|
| |
|
| | def upsert_audiodescription_text(
|
| | sha1sum: str,
|
| | version: str,
|
| | *,
|
| | free_ad: Optional[str] = None,
|
| | une_ad: Optional[str] = None,
|
| | ) -> None:
|
| | """Crea o actualitza un registre d'audiodescripció per sha1sum+version.
|
| |
|
| | - Si la taula no existeix, es crea amb un esquema bàsic.
|
| | - Si no hi ha registre per (sha1sum, version), s'insereix.
|
| | - Si ja existeix, s'actualitzen els camps de text.
|
| | """
|
| |
|
| | try:
|
| | with _connect_audiodescriptions_db() as conn:
|
| | cur = conn.cursor()
|
| |
|
| | cur.execute(
|
| | """
|
| | CREATE TABLE IF NOT EXISTS audiodescriptions (
|
| | id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| | sha1sum TEXT NOT NULL,
|
| | version TEXT NOT NULL,
|
| | une_ad TEXT,
|
| | free_ad TEXT,
|
| | created_at TEXT,
|
| | updated_at TEXT
|
| | );
|
| | """
|
| | )
|
| |
|
| | row = cur.execute(
|
| | "SELECT id FROM audiodescriptions WHERE sha1sum=? AND version=?",
|
| | (sha1sum, version),
|
| | ).fetchone()
|
| |
|
| | now = now_str()
|
| | if row is None:
|
| | cur.execute(
|
| | """
|
| | INSERT INTO audiodescriptions
|
| | (sha1sum, version, une_ad, free_ad, created_at, updated_at)
|
| | VALUES (?, ?, ?, ?, ?, ?)
|
| | """,
|
| | (sha1sum, version, une_ad or "", free_ad or "", now, now),
|
| | )
|
| | else:
|
| | fields = []
|
| | values: list[Any] = []
|
| | if une_ad is not None:
|
| | fields.append("une_ad = ?")
|
| | values.append(une_ad)
|
| | if free_ad is not None:
|
| | fields.append("free_ad = ?")
|
| | values.append(free_ad)
|
| |
|
| | if not fields:
|
| | return
|
| |
|
| | fields.append("updated_at = ?")
|
| | values.append(now)
|
| | values.extend([sha1sum, version])
|
| |
|
| | sql = f"UPDATE audiodescriptions SET {', '.join(fields)} WHERE sha1sum=? AND version=?"
|
| | cur.execute(sql, tuple(values))
|
| | except sqlite3.OperationalError:
|
| |
|
| | return
|
| |
|
| |
|
| | def _connect_captions_db() -> sqlite3.Connection:
|
| | """Connexió a demo/data/captions.db i creació de la taula si cal.
|
| |
|
| | Estructura:
|
| | - variable TEXT PRIMARY KEY (p.ex. "score_1")
|
| | - caption TEXT (etiqueta humana)
|
| | """
|
| |
|
| | CAPTIONS_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
| | conn = sqlite3.connect(str(CAPTIONS_DB_PATH))
|
| | cur = conn.cursor()
|
| | cur.execute(
|
| | """
|
| | CREATE TABLE IF NOT EXISTS captions (
|
| | variable TEXT PRIMARY KEY,
|
| | caption TEXT NOT NULL
|
| | );
|
| | """
|
| | )
|
| | conn.commit()
|
| | return conn
|
| |
|
| |
|
| | def insert_demo_feedback_row(
|
| | *,
|
| | user: str,
|
| | session: str,
|
| | video_name: str,
|
| | version: str,
|
| | une_ad: str,
|
| | free_ad: str,
|
| | comments: str | None,
|
| | transcripcio: int,
|
| | identificacio: int,
|
| | localitzacions: int,
|
| | activitats: int,
|
| | narracions: int,
|
| | expressivitat: int,
|
| | ) -> None:
|
| | """Insereix una valoració detallada a demo/data/feedback.db.
|
| |
|
| | Escala els sliders de 0-7 a 0-100 i desa els textos d'UNE i narració lliure.
|
| | Les columnes de sliders tenen per nom el caption del slider a la UI.
|
| | """
|
| |
|
| |
|
| | def scale(v: int) -> int:
|
| | v = max(0, min(7, int(v)))
|
| | return int(round(v * 100.0 / 7.0))
|
| |
|
| | slider_values = {
|
| | "Precisió Descriptiva": scale(transcripcio),
|
| | "Sincronització Temporal": scale(identificacio),
|
| | "Claredat i Concisió": scale(localitzacions),
|
| | "Inclusió de Diàleg": scale(activitats),
|
| | "Contextualització": scale(narracions),
|
| | "Flux i Ritme de la Narració": scale(expressivitat),
|
| | }
|
| |
|
| | ts = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
| |
|
| | with _connect_feedback_db() as conn:
|
| | conn.execute(
|
| | """
|
| | INSERT INTO feedback (
|
| | timestamp, user, session, video_name, version, une_ad, free_ad, comments,
|
| | score_1, score_2, score_3, score_4, score_5, score_6,
|
| | "Precisió Descriptiva",
|
| | "Sincronització Temporal",
|
| | "Claredat i Concisió",
|
| | "Inclusió de Diàleg",
|
| | "Contextualització",
|
| | "Flux i Ritme de la Narració"
|
| | ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
|
| | """,
|
| | (
|
| | ts,
|
| | user,
|
| | session,
|
| | video_name,
|
| | version,
|
| | une_ad,
|
| | free_ad,
|
| | comments or "",
|
| | slider_values["Precisió Descriptiva"],
|
| | slider_values["Sincronització Temporal"],
|
| | slider_values["Claredat i Concisió"],
|
| | slider_values["Inclusió de Diàleg"],
|
| | slider_values["Contextualització"],
|
| | slider_values["Flux i Ritme de la Narració"],
|
| | slider_values["Precisió Descriptiva"],
|
| | slider_values["Sincronització Temporal"],
|
| | slider_values["Claredat i Concisió"],
|
| | slider_values["Inclusió de Diàleg"],
|
| | slider_values["Contextualització"],
|
| | slider_values["Flux i Ritme de la Narració"],
|
| | ),
|
| | )
|
| |
|
| |
|
| | def _connect_videos_db() -> sqlite3.Connection:
|
| | """Connexió directa a demo/temp/videos.db.
|
| |
|
| | Aquesta BD conté metadades dels vídeos (video_name, owner, visibility, sha1sum...).
|
| | """
|
| |
|
| | VIDEOS_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
| | conn = sqlite3.connect(str(VIDEOS_DB_PATH))
|
| | conn.row_factory = sqlite3.Row
|
| | return conn
|
| |
|
| |
|
| | def _connect_simple_mapping_db(db_path: Path, table_name: str) -> sqlite3.Connection:
|
| | """Connexió a una BD simple (sha1sum, name, description) a demo/temp/db.
|
| |
|
| | Es fa servir per a casting.db i scenarios.db.
|
| | """
|
| |
|
| | db_path.parent.mkdir(parents=True, exist_ok=True)
|
| | conn = sqlite3.connect(str(db_path))
|
| | conn.row_factory = sqlite3.Row
|
| | cur = conn.cursor()
|
| | cur.execute(
|
| | f"""
|
| | CREATE TABLE IF NOT EXISTS {table_name} (
|
| | sha1sum TEXT NOT NULL,
|
| | name TEXT NOT NULL,
|
| | description TEXT
|
| | );
|
| | """
|
| | )
|
| | conn.commit()
|
| | return conn
|
| |
|
| |
|
| | def _connect_casting_db() -> sqlite3.Connection:
|
| | """Connexió directa a demo/temp/db/casting.db (taula casting)."""
|
| |
|
| | return _connect_simple_mapping_db(CASTING_DB_PATH, "casting")
|
| |
|
| |
|
| | def _connect_scenarios_db() -> sqlite3.Connection:
|
| | """Connexió directa a demo/temp/db/scenarios.db (taula scenarios)."""
|
| |
|
| | return _connect_simple_mapping_db(SCENARIOS_DB_PATH, "scenarios")
|
| |
|
| |
|
| | def insert_casting_row(sha1sum: str, name: str, description: str | None = None) -> None:
|
| | """Insereix un personatge a casting.db per a un sha1sum donat."""
|
| |
|
| | if not sha1sum or not name:
|
| | return
|
| |
|
| | try:
|
| | with _connect_casting_db() as conn:
|
| | conn.execute(
|
| | "INSERT INTO casting (sha1sum, name, description) VALUES (?,?,?)",
|
| | (sha1sum, name, description or ""),
|
| | )
|
| | except sqlite3.OperationalError:
|
| | return
|
| |
|
| |
|
| | def insert_scenario_row(sha1sum: str, name: str, description: str | None = None) -> None:
|
| | """Insereix un escenari a scenarios.db per a un sha1sum donat."""
|
| |
|
| | if not sha1sum or not name:
|
| | return
|
| |
|
| | try:
|
| | with _connect_scenarios_db() as conn:
|
| | conn.execute(
|
| | "INSERT INTO scenarios (sha1sum, name, description) VALUES (?,?,?)",
|
| | (sha1sum, name, description or ""),
|
| | )
|
| | except sqlite3.OperationalError:
|
| | return
|
| |
|
| |
|
| | def log_action(
|
| | *,
|
| | session: str,
|
| | user: str,
|
| | phone: str,
|
| | action: str,
|
| | sha1sum: str,
|
| | timestamp: Optional[str] = None,
|
| | ) -> None:
|
| | """Insereix un registre a demo/temp/actions.db (taula actions)."""
|
| |
|
| | ts = timestamp or datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
| |
|
| | insert_action(
|
| | session=session or "",
|
| | user=user or "",
|
| | phone=phone or "",
|
| | action=action,
|
| | sha1sum=sha1sum or "",
|
| | timestamp=ts,
|
| | )
|
| |
|
| |
|
| | def has_video_approval_action(sha1sum: str) -> bool:
|
| | """Comprova si existeix una acció d'acceptació d'input per a un sha1sum.
|
| |
|
| | Busca a demo/temp/actions.db una fila amb action='input-OK' i el sha1sum
|
| | especificat.
|
| | """
|
| |
|
| | if not sha1sum:
|
| | return False
|
| |
|
| | try:
|
| | with _connect_actions_db() as conn:
|
| | cur = conn.execute(
|
| | "SELECT 1 FROM actions WHERE action = ? AND sha1sum = ? LIMIT 1",
|
| | ("input-OK", sha1sum),
|
| | )
|
| | return cur.fetchone() is not None
|
| | except sqlite3.OperationalError:
|
| |
|
| | return False
|
| |
|
| |
|
| | def get_feedback_video_stats(agg: str = "mitjana") -> List[Dict[str, Any]]:
|
| | """Retorna estadístiques agregades per vídeo de demo/data/feedback.db.
|
| |
|
| | Es basa exclusivament en les columnes numèriques score_1..score_6 (0-100).
|
| |
|
| | agg pot ser:
|
| | - "mitjana": mitjana dels scores per vídeo.
|
| | - "mediana": mediana dels scores per vídeo.
|
| | - "inicial": primer registre (per timestamp) per vídeo.
|
| | - "actual": darrer registre (per timestamp) per vídeo.
|
| | """
|
| |
|
| | agg = (agg or "mitjana").lower()
|
| | with _connect_feedback_db() as conn:
|
| | cur = conn.execute(
|
| | """
|
| | SELECT
|
| | video_name,
|
| | timestamp,
|
| | score_1,
|
| | score_2,
|
| | score_3,
|
| | score_4,
|
| | score_5,
|
| | score_6
|
| | FROM feedback
|
| | """
|
| | )
|
| | rows = cur.fetchall()
|
| |
|
| | by_video: Dict[str, List[Dict[str, Any]]] = {}
|
| | for row in rows:
|
| | vn = row["video_name"]
|
| | parsed_scores = [
|
| | row["score_1"],
|
| | row["score_2"],
|
| | row["score_3"],
|
| | row["score_4"],
|
| | row["score_5"],
|
| | row["score_6"],
|
| | ]
|
| | enriched = {
|
| | "video_name": vn,
|
| | "timestamp": row["timestamp"],
|
| | "scores": parsed_scores,
|
| | }
|
| | by_video.setdefault(vn, []).append(enriched)
|
| |
|
| | def parse_ts(ts: str) -> datetime:
|
| |
|
| | try:
|
| | return datetime.strptime(ts, "%Y-%m-%d %H:%M:%S")
|
| | except Exception:
|
| | return datetime.min
|
| |
|
| | result: List[Dict[str, Any]] = []
|
| | for video_name, vrows in by_video.items():
|
| | if not vrows:
|
| | continue
|
| |
|
| |
|
| | vrows_sorted = sorted(vrows, key=lambda r: parse_ts(r["timestamp"]))
|
| |
|
| | def agg_index(idx: int) -> Optional[float]:
|
| |
|
| |
|
| | raw_vals = [r["scores"][idx] for r in vrows]
|
| | vals: List[float] = []
|
| | for v in raw_vals:
|
| | if v is None:
|
| | continue
|
| | try:
|
| | vals.append(float(v))
|
| | except (TypeError, ValueError):
|
| |
|
| | continue
|
| |
|
| | if not vals:
|
| | return None
|
| |
|
| | if agg == "mitjana":
|
| | return float(sum(vals) / len(vals))
|
| | if agg == "mediana":
|
| | return float(median(vals))
|
| | if agg == "inicial":
|
| | first = vals[0]
|
| | return float(first) if first is not None else None
|
| | if agg == "actual":
|
| | last = vals[-1]
|
| | return float(last) if last is not None else None
|
| |
|
| | return float(sum(vals) / len(vals))
|
| |
|
| | row_out: Dict[str, Any] = {
|
| | "video_name": video_name,
|
| | "n": len(vrows),
|
| | }
|
| | for i in range(6):
|
| | row_out[f"score_{i+1}"] = agg_index(i)
|
| |
|
| | result.append(row_out)
|
| |
|
| |
|
| | result.sort(key=lambda r: r["video_name"])
|
| | return result
|
| |
|
| |
|
| | def get_feedback_rows_for_video_version(video_name: str, version: str) -> List[sqlite3.Row]:
|
| | """Retorna totes les files de feedback per a un vídeo i versió concrets.
|
| |
|
| | Només s'utilitzen les columnes score_1..score_6 (0-100) per a estadístiques
|
| | de distribució a la UI.
|
| | """
|
| |
|
| | if not video_name or not version:
|
| | return []
|
| |
|
| | try:
|
| | with _connect_feedback_db() as conn:
|
| | cur = conn.execute(
|
| | """
|
| | SELECT score_1, score_2, score_3, score_4, score_5, score_6
|
| | FROM feedback
|
| | WHERE video_name = ? AND version = ?
|
| | """,
|
| | (video_name, version),
|
| | )
|
| | return cur.fetchall()
|
| | except sqlite3.OperationalError:
|
| | return []
|
| |
|
| |
|
| | def _init_captions_from_eval() -> None:
|
| | """Inicialitza captions.db agafant etiquetes des d'un eval.csv.
|
| |
|
| | Per simplicitat, intentem llegir `demo/data/media/parella/MoE/eval.csv`.
|
| | Si no existeix o falla, es deixen etiquetes per defecte.
|
| | """
|
| |
|
| | base_demo = Path(__file__).resolve().parent
|
| | eval_path = base_demo / "data" / "media" / "parella" / "MoE" / "eval.csv"
|
| |
|
| | default_labels = [f"score_{i}" for i in range(1, 7)]
|
| | labels = default_labels[:]
|
| |
|
| | if eval_path.exists():
|
| | try:
|
| | import csv
|
| |
|
| | with eval_path.open("r", encoding="utf-8") as f:
|
| | reader = csv.DictReader(f)
|
| | tmp: List[str] = []
|
| | for row in reader:
|
| | if len(tmp) >= 6:
|
| | break
|
| | name = (row.get("Caracteristica") or "").strip().strip('"')
|
| | if name:
|
| | tmp.append(name)
|
| | if tmp:
|
| | labels = tmp
|
| | while len(labels) < 6:
|
| | labels.append(default_labels[len(labels)])
|
| | labels = labels[:6]
|
| | except Exception:
|
| | pass
|
| |
|
| | with _connect_captions_db() as conn:
|
| | cur = conn.cursor()
|
| | cur.execute("DELETE FROM captions")
|
| | for i in range(6):
|
| | cur.execute(
|
| | "INSERT OR REPLACE INTO captions (variable, caption) VALUES (?, ?)",
|
| | (f"score_{i+1}", labels[i]),
|
| | )
|
| |
|
| |
|
| | def get_feedback_score_labels() -> List[str]:
|
| | """Retorna les etiquetes humanes per a score_1..score_6 des de captions.db.
|
| |
|
| | Si captions.db és buit, s'intenta inicialitzar-lo a partir d'un eval.csv.
|
| | """
|
| |
|
| | default_labels = [f"score_{i}" for i in range(1, 7)]
|
| |
|
| | with _connect_captions_db() as conn:
|
| | cur = conn.cursor()
|
| | cur.execute("SELECT variable, caption FROM captions ORDER BY variable")
|
| | rows = cur.fetchall()
|
| |
|
| | if not rows:
|
| |
|
| | _init_captions_from_eval()
|
| | cur.execute("SELECT variable, caption FROM captions ORDER BY variable")
|
| | rows = cur.fetchall()
|
| |
|
| | if not rows:
|
| | return default_labels
|
| |
|
| | labels: List[str] = []
|
| | for _, caption in rows:
|
| | labels.append(caption)
|
| |
|
| | while len(labels) < 6:
|
| | labels.append(default_labels[len(labels)])
|
| |
|
| | return labels[:6]
|
| |
|
| |
|
| | def get_feedback_ad_stats():
|
| |
|
| | with get_conn() as conn:
|
| | cur = conn.execute(
|
| | """
|
| | SELECT
|
| | video_name,
|
| | COUNT(*) AS n,
|
| | AVG(transcripcio) AS avg_transcripcio,
|
| | AVG(identificacio) AS avg_identificacio,
|
| | AVG(localitzacions) AS avg_localitzacions,
|
| | AVG(activitats) AS avg_activitats,
|
| | AVG(narracions) AS avg_narracions,
|
| | AVG(expressivitat) AS avg_expressivitat,
|
| | (AVG(transcripcio)+AVG(identificacio)+AVG(localitzacions)+AVG(activitats)+AVG(narracions)+AVG(expressivitat))/6.0 AS avg_global
|
| | FROM feedback_ad
|
| | GROUP BY video_name
|
| | ORDER BY avg_global DESC, n DESC;
|
| | """
|
| | )
|
| | return cur.fetchall()
|
| |
|
| |
|
| | def now_str():
|
| | return datetime.utcnow().isoformat(timespec="seconds") + "Z"
|
| |
|
| |
|
| |
|
| |
|
| | def create_user(username: str, password_hash: str, role: str):
|
| | with get_conn() as conn:
|
| | conn.execute(
|
| | "INSERT INTO users(username, password_hash, role, created_at) VALUES (?,?,?,?)",
|
| | (username, password_hash, role, now_str()),
|
| | )
|
| |
|
| |
|
| | def get_user(username: str):
|
| | with get_conn() as conn:
|
| | cur = conn.execute("SELECT * FROM users WHERE username=?", (username,))
|
| | return cur.fetchone()
|
| |
|
| |
|
| | def get_all_users() -> List[Dict[str, Any]]:
|
| | with get_conn() as conn:
|
| | cur = conn.execute("SELECT id, username, role FROM users ORDER BY username")
|
| | return cur.fetchall()
|
| |
|
| |
|
| | def update_user_password(username: str, password_hash: str):
|
| | with get_conn() as conn:
|
| | conn.execute(
|
| | "UPDATE users SET password_hash = ? WHERE username = ?",
|
| | (password_hash, username),
|
| | )
|
| |
|