| | """ |
| | Query helpers for the SQLite curation index. |
| | |
| | Design goals: |
| | - fast filtering for dataset selection |
| | - safe parameterized SQL (no arbitrary SQL execution by default) |
| | - easy export to paths / jsonl |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import json |
| | import sqlite3 |
| | from dataclasses import dataclass |
| | from pathlib import Path |
| | from typing import Any, Dict, Iterator, List, Optional, Sequence |
| |
|
| |
|
| | @dataclass(frozen=True) |
| | class QueryFilters: |
| | |
| | source_format: Optional[str] = None |
| | has_packed_depth: Optional[bool] = None |
| | scene_type: Optional[str] = None |
| | operating_regime: Optional[str] = None |
| | min_devices: Optional[int] = None |
| | |
| | packed_depth_min_frames: Optional[int] = None |
| | packed_depth_max_gaps: Optional[int] = None |
| |
|
| |
|
| | def _connect_ro(db_path: Path) -> sqlite3.Connection: |
| | |
| | p = Path(db_path).resolve() |
| | conn = sqlite3.connect(f"file:{p.as_posix()}?mode=ro", uri=True) |
| | conn.row_factory = sqlite3.Row |
| | return conn |
| |
|
| |
|
| | def query_bundle_dirs( |
| | *, |
| | db_path: Path, |
| | filters: Optional[QueryFilters] = None, |
| | limit: Optional[int] = None, |
| | order_by: str = "bundle_dir", |
| | ) -> List[str]: |
| | """ |
| | Return a list of bundle_dir strings matching filters. |
| | """ |
| | f = filters or QueryFilters() |
| | params: List[Any] = [] |
| |
|
| | where: List[str] = [] |
| | join_devices = False |
| |
|
| | if f.source_format is not None: |
| | where.append("b.source_format = ?") |
| | params.append(str(f.source_format)) |
| | if f.has_packed_depth is not None: |
| | where.append("b.has_packed_depth = ?") |
| | params.append(1 if bool(f.has_packed_depth) else 0) |
| | if f.scene_type is not None: |
| | where.append("b.scene_type = ?") |
| | params.append(str(f.scene_type)) |
| | if f.operating_regime is not None: |
| | where.append("b.operating_regime = ?") |
| | params.append(str(f.operating_regime)) |
| | if f.min_devices is not None: |
| | where.append("b.num_devices >= ?") |
| | params.append(int(f.min_devices)) |
| |
|
| | if f.packed_depth_min_frames is not None or f.packed_depth_max_gaps is not None: |
| | join_devices = True |
| | |
| | where.append("d.packed_depth_index_path IS NOT NULL") |
| | if f.packed_depth_min_frames is not None: |
| | where.append("d.packed_depth_frames_count >= ?") |
| | params.append(int(f.packed_depth_min_frames)) |
| | if f.packed_depth_max_gaps is not None: |
| | where.append("d.packed_depth_gaps <= ?") |
| | params.append(int(f.packed_depth_max_gaps)) |
| |
|
| | wsql = "" |
| | if where: |
| | wsql = "WHERE " + " AND ".join(where) |
| |
|
| | if order_by not in {"bundle_dir", "capture_id", "created_at", "scene_type"}: |
| | order_by = "bundle_dir" |
| |
|
| | sql = ( |
| | "SELECT DISTINCT b.bundle_dir AS bundle_dir " |
| | "FROM bundles b " |
| | + ("JOIN devices d ON d.bundle_dir = b.bundle_dir " if join_devices else "") |
| | + wsql |
| | + f" ORDER BY b.{order_by} ASC" |
| | ) |
| | if limit is not None and int(limit) > 0: |
| | sql += " LIMIT ?" |
| | params.append(int(limit)) |
| |
|
| | conn = _connect_ro(Path(db_path)) |
| | try: |
| | rows = conn.execute(sql, params).fetchall() |
| | return [str(r["bundle_dir"]) for r in rows] |
| | finally: |
| | conn.close() |
| |
|
| |
|
| | def iter_bundle_shard_seeds( |
| | *, |
| | db_path: Path, |
| | filters: Optional[QueryFilters] = None, |
| | limit: Optional[int] = None, |
| | order_by: str = "bundle_dir", |
| | ) -> Iterator[Dict[str, Any]]: |
| | """ |
| | Stream bundle "seed" rows needed for sample-index generation. |
| | |
| | Returns dicts with: |
| | - bundle_dir |
| | - num_devices |
| | - default_device_id |
| | - teacher_depth_count |
| | """ |
| | f = filters or QueryFilters() |
| | params: List[Any] = [] |
| |
|
| | where: List[str] = [] |
| | join_devices = False |
| |
|
| | if f.source_format is not None: |
| | where.append("b.source_format = ?") |
| | params.append(str(f.source_format)) |
| | if f.has_packed_depth is not None: |
| | where.append("b.has_packed_depth = ?") |
| | params.append(1 if bool(f.has_packed_depth) else 0) |
| | if f.scene_type is not None: |
| | where.append("b.scene_type = ?") |
| | params.append(str(f.scene_type)) |
| | if f.operating_regime is not None: |
| | where.append("b.operating_regime = ?") |
| | params.append(str(f.operating_regime)) |
| | if f.min_devices is not None: |
| | where.append("b.num_devices >= ?") |
| | params.append(int(f.min_devices)) |
| |
|
| | if f.packed_depth_min_frames is not None or f.packed_depth_max_gaps is not None: |
| | join_devices = True |
| | where.append("d.packed_depth_index_path IS NOT NULL") |
| | if f.packed_depth_min_frames is not None: |
| | where.append("d.packed_depth_frames_count >= ?") |
| | params.append(int(f.packed_depth_min_frames)) |
| | if f.packed_depth_max_gaps is not None: |
| | where.append("d.packed_depth_gaps <= ?") |
| | params.append(int(f.packed_depth_max_gaps)) |
| |
|
| | wsql = "" |
| | if where: |
| | wsql = "WHERE " + " AND ".join(where) |
| |
|
| | if order_by not in {"bundle_dir", "capture_id", "created_at", "scene_type"}: |
| | order_by = "bundle_dir" |
| |
|
| | sql = ( |
| | "SELECT DISTINCT " |
| | "b.bundle_dir AS bundle_dir, " |
| | "COALESCE(b.num_devices, 0) AS num_devices, " |
| | "b.default_device_id AS default_device_id, " |
| | "COALESCE(b.teacher_depth_count, 0) AS teacher_depth_count " |
| | "FROM bundles b " |
| | + ("JOIN devices d ON d.bundle_dir = b.bundle_dir " if join_devices else "") |
| | + wsql |
| | + f" ORDER BY b.{order_by} ASC" |
| | ) |
| | if limit is not None and int(limit) > 0: |
| | sql += " LIMIT ?" |
| | params.append(int(limit)) |
| |
|
| | conn = _connect_ro(Path(db_path)) |
| | try: |
| | cur = conn.execute(sql, params) |
| | for r in cur: |
| | yield { |
| | "bundle_dir": str(r["bundle_dir"]), |
| | "num_devices": int(r["num_devices"] or 0), |
| | "default_device_id": r["default_device_id"], |
| | "teacher_depth_count": int(r["teacher_depth_count"] or 0), |
| | } |
| | finally: |
| | conn.close() |
| |
|
| |
|
| | def export_bundle_dirs_txt(bundle_dirs: Sequence[str], output_path: Path) -> None: |
| | p = Path(output_path) |
| | p.parent.mkdir(parents=True, exist_ok=True) |
| | p.write_text("\n".join(str(b) for b in bundle_dirs) + ("\n" if bundle_dirs else "")) |
| |
|
| |
|
| | def export_rows_jsonl( |
| | *, |
| | db_path: Path, |
| | bundle_dirs: Sequence[str], |
| | output_path: Path, |
| | ) -> None: |
| | """ |
| | Export stored row_json for specific bundle dirs as JSONL. |
| | """ |
| | p = Path(output_path) |
| | p.parent.mkdir(parents=True, exist_ok=True) |
| |
|
| | |
| | chunk = 500 |
| | conn = _connect_ro(Path(db_path)) |
| | try: |
| | with p.open("w") as f: |
| | for i in range(0, len(bundle_dirs), chunk): |
| | sub = [str(x) for x in bundle_dirs[i : i + chunk]] |
| | if not sub: |
| | continue |
| | qmarks = ",".join(["?"] * len(sub)) |
| | sql = ( |
| | "SELECT row_json FROM bundles " |
| | f"WHERE bundle_dir IN ({qmarks}) " |
| | "ORDER BY bundle_dir ASC" |
| | ) |
| | rows = conn.execute(sql, sub).fetchall() |
| | for r in rows: |
| | try: |
| | |
| | obj = json.loads(r["row_json"]) |
| | f.write(json.dumps(obj, default=str) + "\n") |
| | except Exception: |
| | |
| | f.write(str(r["row_json"]) + "\n") |
| | finally: |
| | conn.close() |
| |
|