Spaces:
Running
Running
| """Load data from HF Dataset with Streamlit caching.""" | |
| from __future__ import annotations | |
| import json | |
| import pandas as pd | |
| import streamlit as st | |
| from huggingface_hub import hf_hub_download | |
| DATASET_REPO = "buckeyeguy/osc-usage-data" | |
| def load_data() -> tuple[pd.DataFrame, pd.DataFrame, dict]: | |
| """Download Parquet + metadata from HF Dataset. Cached for 5 min.""" | |
| jobs_path = hf_hub_download(repo_id=DATASET_REPO, filename="jobs.parquet", repo_type="dataset") | |
| snapshots_path = hf_hub_download( | |
| repo_id=DATASET_REPO, filename="snapshots.parquet", repo_type="dataset" | |
| ) | |
| metadata_path = hf_hub_download( | |
| repo_id=DATASET_REPO, filename="metadata.json", repo_type="dataset" | |
| ) | |
| jobs = pd.read_parquet(jobs_path) | |
| snapshots = pd.read_parquet(snapshots_path) | |
| with open(metadata_path) as f: | |
| metadata = json.load(f) | |
| # Ensure datetime columns | |
| for col in ["submit_time", "start_time", "end_time"]: | |
| if col in jobs.columns: | |
| jobs[col] = pd.to_datetime(jobs[col]) | |
| # Add derived columns | |
| if "end_time" in jobs.columns: | |
| jobs["end_date"] = jobs["end_time"].dt.date | |
| jobs["end_month"] = jobs["end_time"].dt.to_period("M").astype(str) | |
| jobs["end_dow"] = jobs["end_time"].dt.dayofweek # 0=Mon | |
| jobs["end_hour"] = jobs["end_time"].dt.hour | |
| if "walltime_used" in jobs.columns: | |
| jobs["walltime_hours"] = jobs["walltime_used"] / 3600.0 | |
| # Behavioral outcome classification | |
| if "launch_method" in jobs.columns and "last_state" in jobs.columns: | |
| import numpy as np | |
| from config import INTERACTIVE_METHODS, QUICK_EXIT_SECONDS | |
| is_interactive = jobs["launch_method"].isin(INTERACTIVE_METHODS) | |
| wt = jobs.get("walltime_used", pd.Series(dtype="float64")) | |
| state = jobs["last_state"] | |
| # Start with batch classification (maps exit state directly) | |
| outcome = state.map( | |
| { | |
| "COMPLETED": "Completed", | |
| "FAILED": "Failed", | |
| "TIMEOUT": "Timed Out", | |
| "OUT_OF_MEMORY": "Out of Memory", | |
| } | |
| ).fillna("Cancelled") # All CANCELLED variants + NODE_FAIL → "Cancelled" | |
| # Override for interactive jobs | |
| is_quick = is_interactive & (wt < QUICK_EXIT_SECONDS) | |
| is_failed_interactive = is_interactive & state.isin({"FAILED", "OUT_OF_MEMORY"}) | |
| is_user_ended = ( | |
| is_interactive & ~is_quick & ~is_failed_interactive & state.str.startswith("CANCELLED") | |
| ) | |
| is_session_expired = ( | |
| is_interactive & ~is_quick & ~is_failed_interactive & (state == "TIMEOUT") | |
| ) | |
| outcome = np.where(is_quick, "Quick Exit", outcome) | |
| outcome = np.where(is_failed_interactive, "Failed", outcome) | |
| outcome = np.where(is_user_ended, "User Ended", outcome) | |
| outcome = np.where(is_session_expired, "Session Expired", outcome) | |
| jobs["outcome_category"] = outcome | |
| # Queue wait time | |
| if "submit_time" in jobs.columns and "start_time" in jobs.columns: | |
| jobs["wait_hours"] = (jobs["start_time"] - jobs["submit_time"]).dt.total_seconds() / 3600.0 | |
| return jobs, snapshots, metadata | |
| def filter_jobs( | |
| jobs: pd.DataFrame, | |
| date_range: tuple | None = None, | |
| projects: list[str] | None = None, | |
| users: list[str] | None = None, | |
| systems: list[str] | None = None, | |
| ) -> pd.DataFrame: | |
| """Apply sidebar filters to jobs DataFrame.""" | |
| df = jobs.copy() | |
| if date_range and "end_date" in df.columns: | |
| df = df[df["end_date"].between(date_range[0], date_range[1])] | |
| if projects: | |
| df = df[df["project_code"].isin(projects)] | |
| if users: | |
| df = df[df["username"].isin(users)] | |
| if systems: | |
| df = df[df["system_code"].isin(systems)] | |
| return df | |