Spaces:
Sleeping
Sleeping
| """Data manager for loading and caching pipeline results. | |
| This module encapsulates the logic for computing the heavy data | |
| transformations in ``pipeline.py`` and persisting the results to disk. | |
| It adds a small amount of resilience around caching and uses | |
| ``logging`` instead of printing directly to stdout. The cache files | |
| include a version tag to make it easy to invalidate caches when | |
| fundamental changes are made to the pipeline logic. | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| import tempfile | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from typing import Dict | |
| import pandas as pd | |
| from . import config, pipeline | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Cache setup | |
| # --------------------------------------------------------------------------- | |
| # A version tag to embed into the cache filenames. Bump this value | |
| # whenever the underlying ``pipeline`` logic changes in a way that | |
| # invalidates existing caches. | |
| CACHE_VERSION: str = "v1" | |
| def _resolve_cache_dir() -> Path: | |
| """Select a writable directory for caching. | |
| The lookup order is: | |
| 1. The ``DATA_CACHE_DIR`` environment variable, if set. | |
| 2. A ``data`` folder at the repository root. | |
| 3. A temporary directory in ``/tmp``. | |
| Each candidate path is tested for writability by attempting to | |
| create and delete a sentinel file. The first path that succeeds | |
| is returned. If none succeed, a final fallback directory in ``/tmp`` | |
| is created and returned. | |
| """ | |
| candidates: list[Path] = [] | |
| env = os.getenv("DATA_CACHE_DIR") | |
| if env: | |
| candidates.append(Path(env).expanduser().resolve()) | |
| candidates.append(Path(__file__).resolve().parent.parent / "data") | |
| candidates.append(Path(tempfile.gettempdir()) / "employment_ai_cache") | |
| for path in candidates: | |
| try: | |
| path.mkdir(parents=True, exist_ok=True) | |
| test_file = path / f".write_test_{os.getpid()}" | |
| test_file.write_text("ok", encoding="utf-8") | |
| test_file.unlink() | |
| return path | |
| except Exception: | |
| continue | |
| fallback = Path(tempfile.gettempdir()) / "employment_ai_cache" | |
| fallback.mkdir(parents=True, exist_ok=True) | |
| return fallback | |
| DATA_DIR: Path = _resolve_cache_dir() | |
| WEIGHTED_CACHE: Path = DATA_DIR / f"daioe_weighted_{CACHE_VERSION}.csv" | |
| SIMPLE_CACHE: Path = DATA_DIR / f"daioe_simple_{CACHE_VERSION}.csv" | |
| def _atomic_to_csv(df: pd.DataFrame, path: Path) -> None: | |
| """Write a DataFrame to CSV atomically.""" | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| tmp_path = path.with_suffix(path.suffix + ".tmp") | |
| df.to_csv(tmp_path, index=False) | |
| tmp_path.replace(path) | |
| def _compute_pipeline_payload() -> Dict[str, pd.DataFrame]: | |
| """Runs the heavy pipeline calculation.""" | |
| results = pipeline.run_pipeline() | |
| weighted_frames: list[pd.DataFrame] = [] | |
| simple_frames: list[pd.DataFrame] = [] | |
| for _, taxonomy in config.TAXONOMY_OPTIONS: | |
| payload = results.get(taxonomy) | |
| if not payload: | |
| continue | |
| weighted = payload.get("weighted") | |
| simple = payload.get("simple") | |
| if isinstance(weighted, pd.DataFrame) and not weighted.empty: | |
| weighted_frames.append(weighted) | |
| if isinstance(simple, pd.DataFrame) and not simple.empty: | |
| simple_frames.append(simple) | |
| if not weighted_frames or not simple_frames: | |
| raise RuntimeError("Pipeline did not return weighted + simple datasets.") | |
| return { | |
| "weighted": pd.concat(weighted_frames, ignore_index=True), | |
| "simple": pd.concat(simple_frames, ignore_index=True), | |
| } | |
| def load_payload(force_recompute: bool = False) -> Dict[str, pd.DataFrame]: | |
| """Load data from disk cache if available, otherwise compute and save.""" | |
| if not force_recompute and WEIGHTED_CACHE.exists() and SIMPLE_CACHE.exists(): | |
| logger.info("Loading pipeline output from cache directory %s", DATA_DIR) | |
| try: | |
| dtype = {"taxonomy": str, "code": str} | |
| weighted_df = pd.read_csv(WEIGHTED_CACHE, dtype=dtype) | |
| simple_df = pd.read_csv(SIMPLE_CACHE, dtype=dtype) | |
| return {"weighted": weighted_df, "simple": simple_df} | |
| except Exception as exc: | |
| logger.warning( | |
| "Error reading cache files %s and %s: %s; falling back to recompute", | |
| WEIGHTED_CACHE, | |
| SIMPLE_CACHE, | |
| exc, | |
| ) | |
| if force_recompute: | |
| _compute_pipeline_payload.cache_clear() | |
| logger.info("Computing pipeline data – this may take a while…") | |
| payload = _compute_pipeline_payload() | |
| try: | |
| _atomic_to_csv(payload["weighted"], WEIGHTED_CACHE) | |
| _atomic_to_csv(payload["simple"], SIMPLE_CACHE) | |
| logger.info( | |
| "Cache updated: weighted=%s, simple=%s", | |
| WEIGHTED_CACHE.name, | |
| SIMPLE_CACHE.name, | |
| ) | |
| except Exception as exc: | |
| logger.warning("Could not write cache files: %s", exc) | |
| return payload | |
| def load_data(force_recompute: bool = False) -> Dict[str, pd.DataFrame]: | |
| """Load app-ready frames per taxonomy (from cache when possible).""" | |
| payload = load_payload(force_recompute=force_recompute) | |
| weighted_all = payload["weighted"] | |
| simple_all = payload["simple"] | |
| frames: Dict[str, pd.DataFrame] = {} | |
| for _, taxonomy in config.TAXONOMY_OPTIONS: | |
| dfs = [] | |
| weighted = ( | |
| weighted_all[weighted_all["taxonomy"] == taxonomy] | |
| if "taxonomy" in weighted_all.columns | |
| else weighted_all | |
| ) | |
| simple = ( | |
| simple_all[simple_all["taxonomy"] == taxonomy] | |
| if "taxonomy" in simple_all.columns | |
| else simple_all | |
| ) | |
| if not weighted.empty: | |
| tmp = weighted.copy() | |
| tmp["weighting"] = "emp_weighted" | |
| tmp["weighting_label"] = config.WEIGHTING_OPTIONS[0][0] | |
| dfs.append(tmp) | |
| if not simple.empty: | |
| tmp = simple.copy() | |
| tmp["weighting"] = "simple_avg" | |
| tmp["weighting_label"] = config.WEIGHTING_OPTIONS[1][0] | |
| dfs.append(tmp) | |
| if dfs: | |
| full = pd.concat(dfs, ignore_index=True) | |
| if "year" in full.columns: | |
| full["year"] = full["year"].astype(int) | |
| if "level" in full.columns: | |
| full["level"] = full["level"].astype(int) | |
| frames[taxonomy] = full | |
| if not frames: | |
| raise RuntimeError("No aggregated DAIOE datasets could be built in-memory.") | |
| return frames | |