osc-usage-dashboard / data_loader.py
buckeyeguy's picture
Upload data_loader.py with huggingface_hub
c19f3d3 verified
"""Load data from HF Dataset with Streamlit caching."""
from __future__ import annotations
import json
import pandas as pd
import streamlit as st
from huggingface_hub import hf_hub_download
DATASET_REPO = "buckeyeguy/osc-usage-data"
@st.cache_data(ttl=300)
def load_data() -> tuple[pd.DataFrame, pd.DataFrame, dict]:
"""Download Parquet + metadata from HF Dataset. Cached for 5 min."""
jobs_path = hf_hub_download(repo_id=DATASET_REPO, filename="jobs.parquet", repo_type="dataset")
snapshots_path = hf_hub_download(
repo_id=DATASET_REPO, filename="snapshots.parquet", repo_type="dataset"
)
metadata_path = hf_hub_download(
repo_id=DATASET_REPO, filename="metadata.json", repo_type="dataset"
)
jobs = pd.read_parquet(jobs_path)
snapshots = pd.read_parquet(snapshots_path)
with open(metadata_path) as f:
metadata = json.load(f)
# Ensure datetime columns
for col in ["submit_time", "start_time", "end_time"]:
if col in jobs.columns:
jobs[col] = pd.to_datetime(jobs[col])
# Add derived columns
if "end_time" in jobs.columns:
jobs["end_date"] = jobs["end_time"].dt.date
jobs["end_month"] = jobs["end_time"].dt.to_period("M").astype(str)
jobs["end_dow"] = jobs["end_time"].dt.dayofweek # 0=Mon
jobs["end_hour"] = jobs["end_time"].dt.hour
if "walltime_used" in jobs.columns:
jobs["walltime_hours"] = jobs["walltime_used"] / 3600.0
# Behavioral outcome classification
if "launch_method" in jobs.columns and "last_state" in jobs.columns:
import numpy as np
from config import INTERACTIVE_METHODS, QUICK_EXIT_SECONDS
is_interactive = jobs["launch_method"].isin(INTERACTIVE_METHODS)
wt = jobs.get("walltime_used", pd.Series(dtype="float64"))
state = jobs["last_state"]
# Start with batch classification (maps exit state directly)
outcome = state.map(
{
"COMPLETED": "Completed",
"FAILED": "Failed",
"TIMEOUT": "Timed Out",
"OUT_OF_MEMORY": "Out of Memory",
}
).fillna("Cancelled") # All CANCELLED variants + NODE_FAIL → "Cancelled"
# Override for interactive jobs
is_quick = is_interactive & (wt < QUICK_EXIT_SECONDS)
is_failed_interactive = is_interactive & state.isin({"FAILED", "OUT_OF_MEMORY"})
is_user_ended = (
is_interactive & ~is_quick & ~is_failed_interactive & state.str.startswith("CANCELLED")
)
is_session_expired = (
is_interactive & ~is_quick & ~is_failed_interactive & (state == "TIMEOUT")
)
outcome = np.where(is_quick, "Quick Exit", outcome)
outcome = np.where(is_failed_interactive, "Failed", outcome)
outcome = np.where(is_user_ended, "User Ended", outcome)
outcome = np.where(is_session_expired, "Session Expired", outcome)
jobs["outcome_category"] = outcome
# Queue wait time
if "submit_time" in jobs.columns and "start_time" in jobs.columns:
jobs["wait_hours"] = (jobs["start_time"] - jobs["submit_time"]).dt.total_seconds() / 3600.0
return jobs, snapshots, metadata
def filter_jobs(
jobs: pd.DataFrame,
date_range: tuple | None = None,
projects: list[str] | None = None,
users: list[str] | None = None,
systems: list[str] | None = None,
) -> pd.DataFrame:
"""Apply sidebar filters to jobs DataFrame."""
df = jobs.copy()
if date_range and "end_date" in df.columns:
df = df[df["end_date"].between(date_range[0], date_range[1])]
if projects:
df = df[df["project_code"].isin(projects)]
if users:
df = df[df["username"].isin(users)]
if systems:
df = df[df["system_code"].isin(systems)]
return df