zqiao11's picture
perf: Add cache for predictions
003ab1c
import os
import sys
import plotly.graph_objects as go
# Add project root and src directory to Python path to enable imports from timebench
# Get the directory containing this file (leaderboard_app/src/)
current_dir = os.path.dirname(os.path.abspath(__file__))
# Get leaderboard_app directory
leaderboard_app_dir = os.path.dirname(current_dir)
# Try multiple paths for timebench import:
# 1. Current leaderboard_app directory (if timebench was copied to leaderboard_app/)
# 2. Parent directory's src (for local development: TIME/src/)
# Add current leaderboard_app directory first (for Space deployment)
if leaderboard_app_dir not in sys.path:
sys.path.insert(0, leaderboard_app_dir)
# Get project root directory (TIME/) - for local development
project_root = os.path.dirname(leaderboard_app_dir)
if project_root not in sys.path:
sys.path.insert(0, project_root)
src_dir = os.path.join(project_root, "src")
if src_dir not in sys.path and os.path.exists(src_dir):
sys.path.insert(0, src_dir)
import json
import gradio as gr
from src.about import DATASET_CHOICES, ALL_MODELS, RESULTS_ROOT, FEATURES_DF, FEATURES_BOOL_DF, PATTERN_MAP
from src.leaderboard import (get_overall_leaderboard, get_dataset_multilevel_leaderboard,
get_window_leaderboard, get_pattern_leaderboard, resolve_dataset_id,
_get_dataset_metadata, _load_predictions_cached)
from src.about import DATASETS_DF, ALL_HORIZONS
# get_datasets_root, get_config_root no longer needed here β€” handled by _get_dataset_metadata
import numpy as np
import pandas as pd
from pathlib import Path
import ast
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend for Gradio
import yaml
import tempfile
# Dataset, get_dataset_settings, load_dataset_config no longer needed here β€” handled by _get_dataset_metadata
from src.leaderboard import find_dataset_term_path
def export_dataframe_to_csv(df, filename_prefix="leaderboard"):
"""Export a DataFrame to a temporary CSV file and return the path for download.
Args:
df: pandas DataFrame to export
filename_prefix: prefix for the temporary file name
Returns:
str: path to the temporary CSV file, or None if df is empty
"""
if df is None or (hasattr(df, 'empty') and df.empty):
return None
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False, prefix=f"{filename_prefix}_") as f:
df.to_csv(f, index=False)
return f.name
########################## Dataset Tab ##########################
def update_series_and_variate(display_name):
"""
Update series and variate dropdown options based on dataset display_name.
Used for the merged Dataset tab.
Args:
display_name: Dataset display name from UI dropdown (will be resolved to dataset_id)
"""
# Use first available model to get data
model_name = ALL_MODELS[0]
# Find dataset_term (handles display_name -> dataset_id conversion)
results_root = str(RESULTS_ROOT)
dataset_term = find_dataset_term_path(results_root, model_name, display_name)
if dataset_term is None:
print(f"Error: dataset_term is None for display_name={display_name}, model_name={model_name}")
return (
gr.Dropdown(choices=["---"], value="---", label="Select Series", interactive=True),
gr.Dropdown(choices=["---"], value="---", label="Select Variate", interactive=True),
)
# Use cached metadata (horizon doesn't affect series/variate values)
metadata = _get_dataset_metadata(dataset_term, "short")
if metadata is None:
return (
gr.Dropdown(choices=["---"], value="---", label="Select Series", interactive=True),
gr.Dropdown(choices=["---"], value="---", label="Select Variate", interactive=True),
)
series_list = ["---"] + [str(name) for name in metadata["series_names"]]
if metadata["is_uts"]:
# UTS mode: variate dropdown should be disabled
return (
gr.Dropdown(choices=series_list, value="---", label="Select Series", interactive=True),
gr.Dropdown(choices=["---"], value="---", label="Select Variate", interactive=False),
)
else:
# MTS mode: both dropdowns are enabled
variates_list = ["---"] + [str(name) for name in metadata["variate_names"]]
return (
gr.Dropdown(choices=series_list, value="---", label="Select Series", interactive=True),
gr.Dropdown(choices=variates_list, value="---", label="Select Variate", interactive=True),
)
########################## Window Tab ##########################
def get_available_horizons(display_name):
"""
Get available horizons for a dataset.
Args:
display_name: Dataset display name from UI dropdown
Returns:
list: List of available horizons, e.g., ["short", "medium", "long"] or ["short"]
"""
if DATASETS_DF.empty:
return ALL_HORIZONS
# Resolve display_name to dataset_id
dataset_id = resolve_dataset_id(display_name)
# Filter by dataset_id
df_filtered = DATASETS_DF[DATASETS_DF["dataset_id"] == dataset_id]
if df_filtered.empty:
# If not found, return all horizons as fallback
return ALL_HORIZONS
# Get unique horizons for this dataset
available_horizons = df_filtered["horizon"].unique().tolist()
# Sort to maintain order: short, medium, long
available_horizons = [h for h in ALL_HORIZONS if h in available_horizons]
return available_horizons if available_horizons else ["short"]
def update_horizon_choices(display_name):
"""
Update horizon Radio component's choices and value based on dataset.
Args:
display_name: Dataset display name from UI dropdown
Returns:
tuple: (choices, value) for updating Radio component
"""
available_horizons = get_available_horizons(display_name)
# If current selected horizon is not in available list, select the first available one
current_value = "short" if "short" in available_horizons else (available_horizons[0] if available_horizons else "short")
# Create choices list containing only available horizons
choices = [h for h in ALL_HORIZONS if h in available_horizons]
return gr.Radio(choices=choices, value=current_value)
def update_horizon_checkbox_choices(display_name):
"""
Update horizon CheckboxGroup component's choices and value based on dataset.
Used for Per Dataset tab.
Args:
display_name: Dataset display name from UI dropdown
Returns:
gr.CheckboxGroup: Updated CheckboxGroup component
"""
available_horizons = get_available_horizons(display_name)
# Create choices list containing only available horizons
choices = [h for h in ALL_HORIZONS if h in available_horizons]
# Select all by default
return gr.CheckboxGroup(choices=choices, value=choices)
def update_series_variate_and_window(display_name, horizon):
"""
Update series, variate, and window dropdown options based on dataset display_name and horizon.
Uses Dataset to load actual series and variate names.
Args:
display_name: Dataset display name from UI dropdown (will be resolved to dataset_id)
horizon: Horizon name (short, medium, long)
"""
# Use first available model to get data
model_name = ALL_MODELS[0]
# Find dataset_term (handles display_name -> dataset_id conversion)
results_root = str(RESULTS_ROOT)
dataset_term = find_dataset_term_path(results_root, model_name, display_name)
if dataset_term is None:
print(f"Error: dataset_term is None for display_name={display_name}, horizon={horizon}, model_name={model_name}")
return (
gr.Dropdown(choices=[], value=None, label="Select Series", interactive=False),
gr.Dropdown(choices=[], value=None, label="Select Variate", interactive=False),
gr.Dropdown(choices=[], value=None, label="Select Testing Window", interactive=False),
)
# Use cached metadata
metadata = _get_dataset_metadata(dataset_term, horizon)
if metadata is None:
return (
gr.Dropdown(choices=[], value=None, label="Select Series", interactive=False),
gr.Dropdown(choices=[], value=None, label="Select Variate", interactive=False),
gr.Dropdown(choices=[], value=None, label="Select Testing Window", interactive=False),
)
windows = [str(i) for i in range(metadata["num_windows"])]
series_list = [str(name) for name in metadata["series_names"]]
# Handle UTS (Univariate Time Series) vs MTS (Multivariate Time Series)
if metadata["is_uts"]:
# UTS mode: each series is a single variate, so variate is always 0
return (
gr.Dropdown(choices=series_list, value=series_list[0], label="Select Series", interactive=True),
gr.Dropdown(choices=["0"], value="0", label="Select Variate", interactive=False),
gr.Dropdown(choices=windows, value=windows[0], label="Select Testing Window", interactive=True),
)
else:
# MTS mode: multiple variates per series
variates_list = [str(name) for name in metadata["variate_names"]]
return (
gr.Dropdown(choices=series_list, value=series_list[0], label="Select Series", interactive=True),
gr.Dropdown(choices=variates_list, value=variates_list[0], label="Select Variate", interactive=True),
gr.Dropdown(choices=windows, value=windows[0], label="Select Testing Window", interactive=True),
)
def plot_window_series(display_name, series, variate, window_id, horizon, selected_quantiles, model):
"""
Plot time series predictions for a specific window using Plotly for interactive visualization.
Now includes full time series visualization with test window highlighted.
Accepts series and variate names (strings) and converts them to indices.
Args:
display_name: Dataset display name from UI dropdown (will be resolved to dataset_id)
series: Series name
variate: Variate name
window_id: Window index
horizon: Horizon name
selected_quantiles: List of quantile strings to plot
model: Model name
Returns:
tuple: (fig, info_message) where fig is Plotly figure and info_message contains prediction details
"""
print(f"πŸ” plot_window_series called: display_name={display_name}, series={series}, variate={variate}, window_id={window_id}, horizon={horizon}, model={model}")
if display_name is None or series is None or variate is None or window_id is None:
print("❌ Missing parameters")
fig = go.Figure()
fig.update_layout(title="Please select all parameters")
return fig, ""
results_root = str(RESULTS_ROOT)
print(f"πŸ“ results_root: {results_root}")
dataset_term = find_dataset_term_path(results_root, model, display_name)
print(f"πŸ“ dataset_term: {dataset_term}")
if dataset_term is None:
print("❌ Dataset not found")
fig = go.Figure()
fig.update_layout(title="Dataset not found")
return fig, ""
# --- Cached predictions loading (biggest I/O in Per Test Window) ---
pred_data = _load_predictions_cached(model, dataset_term, horizon)
if pred_data is None:
print(f"❌ Predictions file not found for {model}/{dataset_term}/{horizon}")
fig = go.Figure()
fig.update_layout(title="Predictions file not found for this horizon")
return fig, ""
predictions_quantiles = pred_data["predictions_quantiles"] # (num_series, num_windows, 9, num_variates, prediction_length)
quantile_levels = pred_data["quantile_levels"] # [0.1, 0.2, ..., 0.9]
# Load prediction scale factor from config.json (for float16 overflow prevention)
model_config_path = os.path.join(results_root, model, dataset_term, horizon, "config.json")
prediction_scale_factor = 1.0
if os.path.exists(model_config_path):
with open(model_config_path, "r") as f:
model_config = json.load(f)
prediction_scale_factor = model_config.get("prediction_scale_factor", 1.0)
if prediction_scale_factor != 1.0:
print(f"πŸ“Š Applying inverse scale factor: {prediction_scale_factor}")
# Copy to avoid mutating the cached array
predictions_quantiles = predictions_quantiles.astype(np.float32) * prediction_scale_factor
# Use cached metadata for name-to-index mappings and Dataset object
metadata = _get_dataset_metadata(dataset_term, horizon)
if metadata is None:
print("❌ Failed to load dataset metadata")
fig = go.Figure()
fig.update_layout(title="Failed to load dataset metadata")
return fig, ""
dataset_obj = metadata["dataset_obj"]
dataset_freq = metadata["freq"]
test_length = dataset_obj._test_length
series_name_to_idx = metadata["series_name_to_idx"]
variate_name_to_idx = metadata["variate_name_to_idx"]
print(f"βœ… Dataset loaded from cache: {len(dataset_obj.hf_dataset)} series")
print(f"πŸ“… Dataset frequency: {dataset_freq}")
# Convert series name to index
series_idx = None
if series in series_name_to_idx:
series_idx = series_name_to_idx[series]
print(f"βœ… Found series '{series}' at index {series_idx}")
else:
series_idx = int(series)
print(f"⚠️ Series '{series}' not found in names, using int index {series_idx}")
# Convert variate name to index
variate_idx = None
if metadata["is_uts"]:
variate_idx = 0
print(f"ℹ️ UTS mode, variate_idx=0")
elif variate in variate_name_to_idx:
variate_idx = variate_name_to_idx[variate]
print(f"βœ… Found variate '{variate}' at index {variate_idx}")
else:
variate_idx = int(variate)
print(f"⚠️ Variate '{variate}' not found in names, using int index {variate_idx}")
if series_idx is None:
series_idx = int(series)
if variate_idx is None:
try:
variate_idx = int(variate) if variate is not None else 0
except (ValueError, TypeError):
variate_idx = 0
window_idx = int(window_id)
# Get pre-computed quantiles for this specific series, window, and variate
quantiles_data = predictions_quantiles[series_idx, window_idx, :, variate_idx, :] # (9, prediction_length)
prediction_length = quantiles_data.shape[1]
# Create mapping from quantile level string to index
quantile_level_to_idx = {f"{q:.1f}": i for i, q in enumerate(quantile_levels)}
# Load full time series data
full_series = None
train_end_idx = None
test_window_start_idx = None
test_window_end_idx = None
# Get full target time series for this series
print(f"πŸ“Š Getting target for series_idx={series_idx}, variate_idx={variate_idx}")
full_target = dataset_obj.hf_dataset[series_idx]["target"]
print(f"πŸ“Š full_target shape: {full_target.shape}, dtype: {full_target.dtype}")
print(f"πŸ“Š full_target first 10 values (all variates): {full_target[:, :10] if full_target.ndim > 1 else full_target[:10]}")
# Get start timestamp for this series and create timestamp array
series_start = dataset_obj.hf_dataset[series_idx]["start"]
print(f"πŸ“… Series start timestamp: {series_start}, type: {type(series_start)}")
# Handle numpy array containing datetime64 (common when reading from HF dataset)
if isinstance(series_start, np.ndarray):
# Extract scalar from array
series_start = series_start.item() if series_start.ndim == 0 else series_start[0]
print(f"πŸ“… Extracted scalar: {series_start}, type: {type(series_start)}")
# Convert numpy datetime64 to pandas Timestamp
if isinstance(series_start, (np.datetime64, str)):
series_start = pd.Timestamp(series_start)
# Calculate series length for timestamp creation
if full_target.ndim > 1:
ts_length = full_target.shape[1]
else:
ts_length = len(full_target)
# Create timestamp array for the entire series
try:
timestamps = pd.date_range(start=series_start, periods=ts_length, freq=dataset_freq)
print(f"πŸ“… Created timestamp array: {timestamps[0]} to {timestamps[-1]}")
except Exception as e:
print(f"⚠️ Failed to create timestamps: {e}, falling back to indices")
timestamps = None
# Handle multivariate case: extract specific variate
if full_target.ndim > 1:
full_series = full_target[variate_idx, :] # Shape: (series_length,)
else:
full_series = full_target # Shape: (series_length,)
print(f"πŸ“Š full_series shape: {full_series.shape}, min: {full_series.min()}, max: {full_series.max()}, has_nan: {np.isnan(full_series).any()}")
# Calculate train/test split point
# Test data starts at: series_length - test_length
series_length = len(full_series)
train_end_idx = series_length - test_length
# Calculate current test window position
test_window_start_idx = train_end_idx + window_idx * prediction_length
test_window_end_idx = test_window_start_idx + prediction_length
# Create Plotly figure
fig = go.Figure()
# Quantile colors - from light to dark
quantile_colors = {
"0.1": "#c6dbef", "0.9": "#c6dbef", # lightest
"0.2": "#6baed6", "0.8": "#6baed6", # light
"0.3": "#4292c6", "0.7": "#4292c6", # medium
"0.4": "#2171b5", "0.6": "#2171b5", # dark
"0.5": "#08306b", # darkest (median)
}
# Calculate prediction time steps (overlay on the test window)
if test_window_start_idx is not None:
pred_time_steps = np.arange(test_window_start_idx, test_window_end_idx)
else:
pred_time_steps = np.arange(prediction_length)
# Plot full time series if available
time_steps = np.arange(len(full_series))
# Use timestamps for x-axis if available
if timestamps is not None:
x_full = timestamps
x_pred = timestamps[pred_time_steps] if test_window_start_idx is not None else timestamps[:prediction_length]
x_window = timestamps[test_window_start_idx:test_window_end_idx] if test_window_start_idx is not None else None
else:
x_full = time_steps
x_pred = pred_time_steps
x_window = np.arange(test_window_start_idx, test_window_end_idx) if test_window_start_idx is not None else None
# Plot full series in light gray
fig.add_trace(go.Scatter(
x=x_full,
y=full_series,
mode='lines',
name='Full Time Series',
line=dict(color='gray', width=1),
opacity=0.6,
hovertemplate='Time: %{x}<br>Value: %{y:.4f}<extra></extra>'
))
# Add shapes for regions (training, test, current window)
if train_end_idx is not None:
# Training region - use timestamps if available
x0_train = timestamps[0] if timestamps is not None else 0
x1_train = timestamps[train_end_idx] if timestamps is not None else train_end_idx
fig.add_shape(
type="rect",
x0=x0_train, x1=x1_train,
y0=0, y1=1, yref="paper",
fillcolor="blue", opacity=0.1,
layer="below", line_width=0,
)
# Test region
test_region_end = len(full_series)
x0_test = timestamps[train_end_idx] if timestamps is not None else train_end_idx
x1_test = timestamps[test_region_end-1] if timestamps is not None else test_region_end-1
fig.add_shape(
type="rect",
x0=x0_test, x1=x1_test,
y0=0, y1=1, yref="paper",
fillcolor="orange", opacity=0.15,
layer="below", line_width=0,
)
# Highlight current test window
if test_window_start_idx is not None and test_window_end_idx is not None:
# Use timestamps for window highlight if available
x0_window = timestamps[test_window_start_idx] if timestamps is not None else test_window_start_idx
x1_window = timestamps[test_window_end_idx-1] if timestamps is not None else test_window_end_idx-1
fig.add_shape(
type="rect",
x0=x0_window, x1=x1_window,
y0=0, y1=1, yref="paper",
fillcolor="red", opacity=0.2,
layer="below", line_width=0,
)
# Plot the test window portion of full series
window_series = full_series[test_window_start_idx:test_window_end_idx]
fig.add_trace(go.Scatter(
x=x_window,
y=window_series,
mode='lines',
name='Ground Truth (Window)',
line=dict(color='red', width=2),
opacity=0.8,
hovertemplate='Time: %{x}<br>Value: %{y:.4f}<extra></extra>'
))
# Quantile pairs mapping: UI selection -> (low, high) quantile values
quantile_pair_map = {
"0.1-0.9": ("0.1", "0.9"),
"0.2-0.8": ("0.2", "0.8"),
"0.3-0.7": ("0.3", "0.7"),
"0.4-0.6": ("0.4", "0.6"),
}
# Helper function to get pre-computed quantile values
def get_quantile_values(q_str):
return quantiles_data[quantile_level_to_idx[q_str], :]
# Plot quantile pairs with fill (based on paired selection)
for pair_str, (q_low_str, q_high_str) in quantile_pair_map.items():
if pair_str in selected_quantiles:
quantile_low = get_quantile_values(q_low_str)
quantile_high = get_quantile_values(q_high_str)
color = quantile_colors.get(q_low_str, "#2171b5")
# Add filled area between quantiles
fig.add_trace(go.Scatter(
x=list(x_pred) + list(x_pred[::-1]),
y=list(quantile_high) + list(quantile_low[::-1]),
fill='toself',
fillcolor=color,
line=dict(color='rgba(255,255,255,0)'),
hoverinfo="skip",
showlegend=True,
name=f'Q{q_low_str}-Q{q_high_str}',
opacity=0.3
))
# Add lower quantile line
fig.add_trace(go.Scatter(
x=x_pred,
y=quantile_low,
mode='lines',
name=f'Q{q_low_str}',
line=dict(color=color, width=1),
opacity=0.7,
showlegend=False,
hovertemplate=f'Time: %{{x}}<br>Q{q_low_str}: %{{y:.4f}}<extra></extra>'
))
# Add upper quantile line
fig.add_trace(go.Scatter(
x=x_pred,
y=quantile_high,
mode='lines',
name=f'Q{q_high_str}',
line=dict(color=color, width=1),
opacity=0.7,
showlegend=False,
hovertemplate=f'Time: %{{x}}<br>Q{q_high_str}: %{{y:.4f}}<extra></extra>'
))
# Plot median (0.5) if selected
if "0.5" in selected_quantiles:
quantile_values = get_quantile_values("0.5")
color = quantile_colors.get("0.5", "#08306b")
fig.add_trace(go.Scatter(
x=x_pred,
y=quantile_values,
mode='lines+markers',
name='Median (Q0.5)',
line=dict(color=color, width=3),
marker=dict(size=5, symbol='circle'),
opacity=0.8,
hovertemplate='Time: %{x}<br>Q0.5: %{y:.4f}<extra></extra>'
))
# Update layout - use autosize for responsive width
x_axis_title = "Timestamp" if timestamps is not None else "Time Step"
fig.update_layout(
title=None,
xaxis_title=x_axis_title,
yaxis_title="Value",
hovermode='x unified',
autosize=True, # Use automatic width to make chart responsive to container size
height=400,
margin=dict(l=60, r=40, t=60, b=60), # Set reasonable margins
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1,
font=dict(size=14)
),
plot_bgcolor='white',
xaxis=dict(showgrid=True, gridcolor='lightgray', gridwidth=1),
yaxis=dict(showgrid=True, gridcolor='lightgray', gridwidth=1)
)
# Create info message for prediction window
if timestamps is not None and test_window_start_idx is not None and test_window_end_idx is not None:
pred_start_ts = timestamps[test_window_start_idx]
pred_end_ts = timestamps[test_window_end_idx - 1] # -1 because end index is exclusive
# Format with weekday name
start_str = f"{pred_start_ts.strftime('%Y-%m-%d %H:%M:%S')} ({pred_start_ts.day_name()})"
end_str = f"{pred_end_ts.strftime('%Y-%m-%d %H:%M:%S')} ({pred_end_ts.day_name()})"
base_info = (
f"πŸ“Š Prediction Length: {prediction_length}\n"
f"πŸ“… Prediction Range: {start_str} β†’ {end_str}\n"
f"πŸ”„ Dataset Frequency: {dataset_freq}"
)
else:
base_info = (
f"πŸ“Š Prediction Length: {prediction_length}\n"
f"πŸ“… Prediction Range: index {test_window_start_idx} β†’ {test_window_end_idx - 1}\n"
f"πŸ”„ Dataset Frequency: {dataset_freq if 'dataset_freq' in dir() else 'N/A'}"
)
# Get features information for the selected variate
# Pattern names from init_per_pattern_tab
pattern_names = [
"T_strength", "T_linearity",
"S_strength", "S_corr",
"R_ACF1",
"stationarity", "complexity"
]
features_info = ""
if not FEATURES_DF.empty and not FEATURES_BOOL_DF.empty:
# Find matching row in features dataframes
# Try to match by dataset_id, series_name, variate_name
feature_row_orig = None
feature_row_bool = None
# Match by dataset_id first
features_subset_orig = FEATURES_DF[FEATURES_DF["dataset_id"] == dataset_term]
features_subset_bool = FEATURES_BOOL_DF[FEATURES_BOOL_DF["dataset_id"] == dataset_term]
print(f"πŸ” Features lookup: dataset_term={dataset_term}, series={series}, variate={variate}")
print(f"πŸ” Features subset size: orig={len(features_subset_orig)}, bool={len(features_subset_bool)}")
# Try matching by series_name and variate_name (for MTS)
if not features_subset_orig.empty:
# Check if series_name matches
if "series_name" in features_subset_orig.columns:
series_match_orig = features_subset_orig["series_name"] == series
if series_match_orig.any():
series_matched = features_subset_orig[series_match_orig]
print(f"πŸ” Found {len(series_matched)} rows with series_name={series}")
# Check if variate_name matches
if "variate_name" in series_matched.columns:
# For UTS, variate might be "0" or 0, try both
variate_str = str(variate)
variate_match_orig = (series_matched["variate_name"] == variate_str) | (series_matched["variate_name"] == variate)
if variate_match_orig.any():
feature_row_orig = series_matched[variate_match_orig].iloc[0]
print(f"βœ… Found feature row by series_name + variate_name")
# Find corresponding row in bool dataframe
if not features_subset_bool.empty and "series_name" in features_subset_bool.columns and "variate_name" in features_subset_bool.columns:
series_match_bool = features_subset_bool["series_name"] == series
variate_match_bool = (features_subset_bool["variate_name"] == variate_str) | (features_subset_bool["variate_name"] == variate)
bool_matched = features_subset_bool[series_match_bool & variate_match_bool]
if not bool_matched.empty:
feature_row_bool = bool_matched.iloc[0]
# If not found, try matching by series_name only (for UTS cases where variate_name might not match)
if feature_row_orig is None and not features_subset_orig.empty:
if "series_name" in features_subset_orig.columns:
series_match_orig = features_subset_orig["series_name"] == series
if series_match_orig.any():
# For UTS, there might be only one row per series
series_matched = features_subset_orig[series_match_orig]
if len(series_matched) == 1:
feature_row_orig = series_matched.iloc[0]
print(f"βœ… Found feature row by series_name only (UTS)")
# Find corresponding row in bool dataframe
if not features_subset_bool.empty and "series_name" in features_subset_bool.columns:
series_match_bool = features_subset_bool["series_name"] == series
bool_matched = features_subset_bool[series_match_bool]
if len(bool_matched) == 1:
feature_row_bool = bool_matched.iloc[0]
# If still not found, try matching by variate_name only (for UTS cases where variate_name == series)
if feature_row_orig is None and not features_subset_orig.empty:
if "variate_name" in features_subset_orig.columns:
variate_match_orig = features_subset_orig["variate_name"] == series # For UTS, series might be the variate_name
if variate_match_orig.any():
feature_row_orig = features_subset_orig[variate_match_orig].iloc[0]
print(f"βœ… Found feature row by variate_name (series as variate_name)")
# Find corresponding row in bool dataframe
if not features_subset_bool.empty and "variate_name" in features_subset_bool.columns:
variate_match_bool = features_subset_bool["variate_name"] == series
if variate_match_bool.any():
feature_row_bool = features_subset_bool[variate_match_bool].iloc[0]
if feature_row_orig is None:
print(f"⚠️ Could not find features for dataset_term={dataset_term}, series={series}, variate={variate}")
if not features_subset_orig.empty:
print(f" Available series_names: {features_subset_orig['series_name'].unique()[:10] if 'series_name' in features_subset_orig.columns else 'N/A'}")
print(f" Available variate_names: {features_subset_orig['variate_name'].unique()[:10] if 'variate_name' in features_subset_orig.columns else 'N/A'}")
if feature_row_orig is not None:
# Build features display
features_orig_items = []
features_bool_items = []
for pattern_name in pattern_names:
# Map pattern name to feature column name
feature_col = PATTERN_MAP.get(pattern_name, pattern_name)
# Get original value
if feature_col in feature_row_orig.index:
orig_value = feature_row_orig[feature_col]
if pd.notna(orig_value):
features_orig_items.append(f"{pattern_name}: {orig_value:.3f}")
# Get binary value
if feature_row_bool is not None and feature_col in feature_row_bool.index:
bool_value = feature_row_bool[feature_col]
if pd.notna(bool_value):
features_bool_items.append(f"{pattern_name}: {bool_value}")
if features_orig_items or features_bool_items:
features_info = "\n\n πŸ“ Features of variate:\n"
if features_orig_items:
features_info += "- Original Values: " + ", ".join(features_orig_items) + "\n"
if features_bool_items:
features_info += "- Binary Values (0/1): " + ", ".join(features_bool_items)
info_message = base_info + features_info
print(f"πŸ“ Info message: {info_message}")
return fig, info_message
def init_overall_tab():
gr.Markdown(
"""
This tab presents each model's overall performance aggregated across all tasks. A **task** is defined as a specific **(dataset, horizon)** pair. For each task, the result is obtained by averaging the metrics across all its variates.
- **MASE (norm.), CRPS (norm.)**: task-level results are normalized by Seasonal Naive and aggregated by geometric mean.
- **MASE_rank, CRPS_rank**: for each task, models are ranked by the metric; the average rank across all tasks is then reported.
""",
elem_classes="markdown-text"
)
overall_table = gr.DataFrame(
value=get_overall_leaderboard(DATASETS_DF, metric="MASE"),
elem_classes="custom-table",
interactive=False
)
# CSV Export
def export_overall_csv():
df = get_overall_leaderboard(DATASETS_DF, metric="MASE")
return export_dataframe_to_csv(df, filename_prefix="overall_leaderboard")
with gr.Row():
export_btn = gr.Button("πŸ“₯ Export CSV", size="sm")
export_file = gr.File(label="Download CSV", visible=False)
export_btn.click(
fn=export_overall_csv,
inputs=[],
outputs=[export_file]
).then(
fn=lambda: gr.File(visible=True),
inputs=[],
outputs=[export_file]
)
def init_per_dataset_tab(demo):
gr.Markdown(
"""
This tab provides flexible analysis at dataset, series, and variate levels.
- **Dataset only**: Shows both Seasonal Naive-normalized metrics (task-level) and original non-normalized metrics, plus average ranks
- **Series/Variate selected**: Shows only original metrics.
- **Horizons**: Select one or more horizons to aggregate results
""",
elem_classes="markdown-text"
)
# Initialize horizon choices based on first dataset
initial_dataset = DATASET_CHOICES[0]
initial_horizons = get_available_horizons(initial_dataset)
with gr.Row():
with gr.Column(scale=1):
horizons = gr.CheckboxGroup(
choices=initial_horizons,
value=initial_horizons,
label="Horizons"
)
dataset_dropdown = gr.Dropdown(
choices=DATASET_CHOICES,
value=initial_dataset,
label="Dataset",
interactive=True
)
# Initialize series and variate dropdowns
series_dropdown, variate_dropdown = update_series_and_variate(
initial_dataset
)
msg = gr.Textbox(label="Message", interactive=False)
table = gr.DataFrame(elem_classes="custom-table", interactive=False)
# Update horizons, series, and variate dropdowns when dataset changes
dataset_dropdown.change(
fn=update_horizon_checkbox_choices,
inputs=[dataset_dropdown],
outputs=[horizons],
).then(
fn=update_series_and_variate,
inputs=[dataset_dropdown],
outputs=[series_dropdown, variate_dropdown],
).then(
fn=get_dataset_multilevel_leaderboard,
inputs=[dataset_dropdown, series_dropdown, variate_dropdown, horizons],
outputs=[msg, table]
)
# Update leaderboard when series, variate, or horizons change
for comp in [series_dropdown, variate_dropdown, horizons]:
comp.change(
fn=get_dataset_multilevel_leaderboard,
inputs=[dataset_dropdown, series_dropdown, variate_dropdown, horizons],
outputs=[msg, table]
)
# Load on startup
demo.load(
fn=get_dataset_multilevel_leaderboard,
inputs=[dataset_dropdown, series_dropdown, variate_dropdown, horizons],
outputs=[msg, table]
)
# CSV Export
def export_dataset_csv(dataset, series, variate, horizons_val):
_, df = get_dataset_multilevel_leaderboard(dataset, series, variate, horizons_val)
# Sanitize dataset name for filename (replace / with _)
safe_dataset_name = dataset.replace("/", "_") if dataset else "unknown"
return export_dataframe_to_csv(df, filename_prefix=f"dataset_{safe_dataset_name}")
with gr.Row():
export_btn = gr.Button("πŸ“₯ Export CSV", size="sm")
export_file = gr.File(label="Download CSV", visible=False)
export_btn.click(
fn=export_dataset_csv,
inputs=[dataset_dropdown, series_dropdown, variate_dropdown, horizons],
outputs=[export_file]
).then(
fn=lambda: gr.File(visible=True),
inputs=[],
outputs=[export_file]
)
def init_per_window_tab(demo):
gr.Markdown(
"""
This tab enables detailed analysis of model performance at the level of individual testing windows. By selecting a dataset, variate, horizon, and test window, users can examine window-level metrics (MASE, CRPS, MAE, MSE) at fine granularity and visualize the predicted quantiles of a model along with the ground-truth.
- **Interactive Visualization**: Zoom, pan, autoscale and download the plot.
- 🟦 Train Split 🟨 Test Split πŸŸ₯ Prediction Window
"""
)
QUANTILE_PAIR_CHOICES = ["0.1-0.9", "0.2-0.8", "0.3-0.7", "0.4-0.6", "0.5"]
initial_quantiles = ["0.5"]
with gr.Row():
with gr.Column(scale=1):
# Initialize horizon choices based on first dataset
initial_dataset = DATASET_CHOICES[0] if DATASET_CHOICES else None
initial_horizons = get_available_horizons(initial_dataset) if initial_dataset else ALL_HORIZONS
horizons = gr.Radio(
choices=initial_horizons,
value="short" if "short" in initial_horizons else (initial_horizons[0] if initial_horizons else "short"),
label="Horizons"
)
# Dropdown for dataset selection
dataset_dropdown = gr.Dropdown(
choices=DATASET_CHOICES,
value=DATASET_CHOICES[0] if DATASET_CHOICES else None, # Select first by default
label="Dataset",
interactive=True
)
# Initialize series, variate, window dropdowns using function
series_dropdown, variate_dropdown, window_dropdown = update_series_variate_and_window(
dataset_dropdown.value, horizons.value
)
with gr.Column(scale=2):
with gr.Row():
with gr.Column(scale=2):
quantiles = gr.CheckboxGroup(
choices=QUANTILE_PAIR_CHOICES,
value=initial_quantiles,
label="Select Quantiles for Visualization"
)
with gr.Column(scale=1):
model = gr.Dropdown(
choices=ALL_MODELS,
value=ALL_MODELS[0],
label="Select Model for Visualization",
interactive=True
)
ts_visualization = gr.Plot()
# Message box for prediction window info
prediction_info = gr.Textbox(
label="Info",
interactive=False,
lines=3
)
table_window = gr.DataFrame(elem_classes="custom-table", interactive=False)
# ── Shared input / output lists ────────────────────────────────────
_plot_in = [dataset_dropdown, series_dropdown, variate_dropdown,
window_dropdown, horizons, quantiles, model]
_plot_out = [ts_visualization, prediction_info]
_tbl_in = [dataset_dropdown, series_dropdown, variate_dropdown,
window_dropdown, horizons]
_tbl_out = table_window
# ── dataset changes ─────────────────────────────────────────────────
# Chain: update horizons β†’ update dropdowns β†’ refresh plot β†’ refresh table.
# The chain already calls plot & table at the end, so we do NOT bind
# separate .change() on series/variate/window for this trigger path β€”
# otherwise updating the 3 dropdowns cascades into 3 extra duplicate
# plot_window_series calls (the #1 cause of slowness on HF Space).
dataset_dropdown.change(
fn=update_horizon_choices,
inputs=[dataset_dropdown],
outputs=[horizons],
).then(
fn=update_series_variate_and_window,
inputs=[dataset_dropdown, horizons],
outputs=[series_dropdown, variate_dropdown, window_dropdown],
).then(
fn=plot_window_series, inputs=_plot_in, outputs=_plot_out,
).then(
fn=get_window_leaderboard, inputs=_tbl_in, outputs=_tbl_out,
)
# ── horizon changes ─────────────────────────────────────────────────
horizons.change(
fn=update_series_variate_and_window,
inputs=[dataset_dropdown, horizons],
outputs=[series_dropdown, variate_dropdown, window_dropdown],
).then(
fn=plot_window_series, inputs=_plot_in, outputs=_plot_out,
).then(
fn=get_window_leaderboard, inputs=_tbl_in, outputs=_tbl_out,
)
# ── series / variate / window manual changes ────────────────────────
# Use a single .then() chain per dropdown so each user-initiated
# change fires plot + table exactly ONCE instead of 2 separate events.
for comp in [series_dropdown, variate_dropdown, window_dropdown]:
comp.change(
fn=plot_window_series, inputs=_plot_in, outputs=_plot_out,
).then(
fn=get_window_leaderboard, inputs=_tbl_in, outputs=_tbl_out,
)
# ── quantiles / model changes ───────────────────────────────────────
for comp in [quantiles, model]:
comp.change(
fn=plot_window_series, inputs=_plot_in, outputs=_plot_out,
)
# ── initial page load ───────────────────────────────────────────────
demo.load(
fn=plot_window_series, inputs=_plot_in, outputs=_plot_out,
)
demo.load(
fn=get_window_leaderboard, inputs=_tbl_in, outputs=_tbl_out,
)
# CSV Export
def export_window_csv(dataset, series, variate, window, horizon):
df = get_window_leaderboard(dataset, series, variate, window, horizon)
return export_dataframe_to_csv(df, filename_prefix="window_leaderboard")
with gr.Row():
export_btn = gr.Button("πŸ“₯ Export CSV", size="sm")
export_file = gr.File(label="Download CSV", visible=False)
export_btn.click(
fn=export_window_csv,
inputs=[dataset_dropdown, series_dropdown, variate_dropdown, window_dropdown, horizons],
outputs=[export_file]
).then(
fn=lambda: gr.File(visible=True),
inputs=[],
outputs=[export_file]
)
def init_per_pattern_tab(demo):
gr.Markdown(
"""
This tab allows you to explore model performance based on **selected patterns**.
Select patterns to filter variates that exhibit those characteristics, then view aggregated model performance.
Each pattern is a **boolean indicator** derived from time series features (binarized by **median** threshold for continuous features).
- **Patterns are intersected**: A variate must exhibit ALL selected patterns to be included.
- **MASE (norm.), CRPS (norm.)**: variate-level results are normalized by Seasonal Naive and aggregated by geometric mean across all matching variates.
- **MASE (raw), CRPS (raw)**: arithmetic mean across all matching variates.
""",
elem_classes="markdown-text"
)
# Define pattern choices for Radio components
PATTERN_CHOICES = ["N/A", "=1", "=0"]
with gr.Row(): # TSFeatures
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### πŸ“ˆ Trend Features")
T_strength = gr.Radio(
choices=PATTERN_CHOICES, value="N/A", label="T_strength"
)
T_linearity = gr.Radio(
choices=PATTERN_CHOICES, value="N/A", label="T_linearity"
)
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### πŸ”„ Seasonal Features")
S_strength = gr.Radio(
choices=PATTERN_CHOICES, value="N/A", label="S_strength"
)
S_corr = gr.Radio(
choices=PATTERN_CHOICES, value="N/A", label="S_corr"
)
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### 🎯 Residual Features")
R_ACF1 = gr.Radio(
choices=PATTERN_CHOICES, value="N/A", label="R_ACF1"
)
with gr.Column(scale=1):
with gr.Group():
gr.Markdown("### βš™οΈ Global Features")
stationarity = gr.Radio(
choices=PATTERN_CHOICES, value="N/A", label="stationarity"
)
complexity = gr.Radio(
choices=PATTERN_CHOICES, value="N/A", label="complexity"
)
# List of all pattern Radio components and their names
pattern_radios = [
T_strength, T_linearity,
S_strength, S_corr,
R_ACF1,
stationarity, complexity
]
pattern_names = [
"T_strength", "T_linearity",
"S_strength", "S_corr",
"R_ACF1",
"stationarity", "complexity"
]
with gr.Row():
with gr.Column(scale=1):
horizons = gr.CheckboxGroup(
choices=ALL_HORIZONS,
value=ALL_HORIZONS,
label="Horizons"
)
with gr.Column(scale=2):
msg_pattern = gr.Textbox(label="Status", interactive=False, lines=4)
table_variates = gr.DataFrame(elem_classes="custom-table", interactive=False)
def merge_patterns(*radio_values):
"""Convert Radio values to pattern filter dict.
Args:
*radio_values: Values from all Radio components in order of pattern_names
Returns:
dict: {feature_name: required_value} where required_value is 0 or 1.
Features with "N/A" are not included in the dict.
"""
result = {}
for name, value in zip(pattern_names, radio_values):
if value == "=1":
result[name] = 1
elif value == "=0":
result[name] = 0
# "N/A" -> don't include in dict (no filter on this feature)
return result
def update_leaderboard(*args):
"""Callback to update the pattern leaderboard.
Args:
*args: All Radio values followed by horizons (last argument)
"""
# Last argument is horizons, rest are pattern radio values
horizons_val = args[-1]
radio_values = args[:-1]
pattern_filters = merge_patterns(*radio_values)
return get_pattern_leaderboard(pattern_filters, horizons_val)
# Bind change events for all pattern radios and horizons
all_inputs = pattern_radios + [horizons]
for comp in all_inputs:
comp.change(
fn=update_leaderboard,
inputs=all_inputs,
outputs=[msg_pattern, table_variates]
)
# Load initial state
demo.load(
fn=update_leaderboard,
inputs=all_inputs,
outputs=[msg_pattern, table_variates]
)
# CSV Export
def export_pattern_csv(*args):
# Last argument is horizons, rest are pattern radio values
horizons_val = args[-1]
radio_values = args[:-1]
pattern_filters = merge_patterns(*radio_values)
_, df = get_pattern_leaderboard(pattern_filters, horizons_val)
return export_dataframe_to_csv(df, filename_prefix="pattern_leaderboard")
with gr.Row():
export_btn = gr.Button("πŸ“₯ Export CSV", size="sm")
export_file = gr.File(label="Download CSV", visible=False)
export_btn.click(
fn=export_pattern_csv,
inputs=all_inputs,
outputs=[export_file]
).then(
fn=lambda: gr.File(visible=True),
inputs=[],
outputs=[export_file]
)