Spaces:
Running
Running
| import os | |
| import sys | |
| import plotly.graph_objects as go | |
| # Add project root and src directory to Python path to enable imports from timebench | |
| # Get the directory containing this file (leaderboard_app/src/) | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| # Get leaderboard_app directory | |
| leaderboard_app_dir = os.path.dirname(current_dir) | |
| # Try multiple paths for timebench import: | |
| # 1. Current leaderboard_app directory (if timebench was copied to leaderboard_app/) | |
| # 2. Parent directory's src (for local development: TIME/src/) | |
| # Add current leaderboard_app directory first (for Space deployment) | |
| if leaderboard_app_dir not in sys.path: | |
| sys.path.insert(0, leaderboard_app_dir) | |
| # Get project root directory (TIME/) - for local development | |
| project_root = os.path.dirname(leaderboard_app_dir) | |
| if project_root not in sys.path: | |
| sys.path.insert(0, project_root) | |
| src_dir = os.path.join(project_root, "src") | |
| if src_dir not in sys.path and os.path.exists(src_dir): | |
| sys.path.insert(0, src_dir) | |
| import json | |
| import gradio as gr | |
| from src.about import DATASET_CHOICES, ALL_MODELS, RESULTS_ROOT, FEATURES_DF, FEATURES_BOOL_DF, PATTERN_MAP | |
| from src.leaderboard import (get_overall_leaderboard, get_dataset_multilevel_leaderboard, | |
| get_window_leaderboard, get_pattern_leaderboard, resolve_dataset_id, | |
| _get_dataset_metadata, _load_predictions_cached) | |
| from src.about import DATASETS_DF, ALL_HORIZONS | |
| # get_datasets_root, get_config_root no longer needed here β handled by _get_dataset_metadata | |
| import numpy as np | |
| import pandas as pd | |
| from pathlib import Path | |
| import ast | |
| import matplotlib | |
| matplotlib.use('Agg') # Use non-interactive backend for Gradio | |
| import yaml | |
| import tempfile | |
| # Dataset, get_dataset_settings, load_dataset_config no longer needed here β handled by _get_dataset_metadata | |
| from src.leaderboard import find_dataset_term_path | |
| def export_dataframe_to_csv(df, filename_prefix="leaderboard"): | |
| """Export a DataFrame to a temporary CSV file and return the path for download. | |
| Args: | |
| df: pandas DataFrame to export | |
| filename_prefix: prefix for the temporary file name | |
| Returns: | |
| str: path to the temporary CSV file, or None if df is empty | |
| """ | |
| if df is None or (hasattr(df, 'empty') and df.empty): | |
| return None | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False, prefix=f"{filename_prefix}_") as f: | |
| df.to_csv(f, index=False) | |
| return f.name | |
| ########################## Dataset Tab ########################## | |
| def update_series_and_variate(display_name): | |
| """ | |
| Update series and variate dropdown options based on dataset display_name. | |
| Used for the merged Dataset tab. | |
| Args: | |
| display_name: Dataset display name from UI dropdown (will be resolved to dataset_id) | |
| """ | |
| # Use first available model to get data | |
| model_name = ALL_MODELS[0] | |
| # Find dataset_term (handles display_name -> dataset_id conversion) | |
| results_root = str(RESULTS_ROOT) | |
| dataset_term = find_dataset_term_path(results_root, model_name, display_name) | |
| if dataset_term is None: | |
| print(f"Error: dataset_term is None for display_name={display_name}, model_name={model_name}") | |
| return ( | |
| gr.Dropdown(choices=["---"], value="---", label="Select Series", interactive=True), | |
| gr.Dropdown(choices=["---"], value="---", label="Select Variate", interactive=True), | |
| ) | |
| # Use cached metadata (horizon doesn't affect series/variate values) | |
| metadata = _get_dataset_metadata(dataset_term, "short") | |
| if metadata is None: | |
| return ( | |
| gr.Dropdown(choices=["---"], value="---", label="Select Series", interactive=True), | |
| gr.Dropdown(choices=["---"], value="---", label="Select Variate", interactive=True), | |
| ) | |
| series_list = ["---"] + [str(name) for name in metadata["series_names"]] | |
| if metadata["is_uts"]: | |
| # UTS mode: variate dropdown should be disabled | |
| return ( | |
| gr.Dropdown(choices=series_list, value="---", label="Select Series", interactive=True), | |
| gr.Dropdown(choices=["---"], value="---", label="Select Variate", interactive=False), | |
| ) | |
| else: | |
| # MTS mode: both dropdowns are enabled | |
| variates_list = ["---"] + [str(name) for name in metadata["variate_names"]] | |
| return ( | |
| gr.Dropdown(choices=series_list, value="---", label="Select Series", interactive=True), | |
| gr.Dropdown(choices=variates_list, value="---", label="Select Variate", interactive=True), | |
| ) | |
| ########################## Window Tab ########################## | |
| def get_available_horizons(display_name): | |
| """ | |
| Get available horizons for a dataset. | |
| Args: | |
| display_name: Dataset display name from UI dropdown | |
| Returns: | |
| list: List of available horizons, e.g., ["short", "medium", "long"] or ["short"] | |
| """ | |
| if DATASETS_DF.empty: | |
| return ALL_HORIZONS | |
| # Resolve display_name to dataset_id | |
| dataset_id = resolve_dataset_id(display_name) | |
| # Filter by dataset_id | |
| df_filtered = DATASETS_DF[DATASETS_DF["dataset_id"] == dataset_id] | |
| if df_filtered.empty: | |
| # If not found, return all horizons as fallback | |
| return ALL_HORIZONS | |
| # Get unique horizons for this dataset | |
| available_horizons = df_filtered["horizon"].unique().tolist() | |
| # Sort to maintain order: short, medium, long | |
| available_horizons = [h for h in ALL_HORIZONS if h in available_horizons] | |
| return available_horizons if available_horizons else ["short"] | |
| def update_horizon_choices(display_name): | |
| """ | |
| Update horizon Radio component's choices and value based on dataset. | |
| Args: | |
| display_name: Dataset display name from UI dropdown | |
| Returns: | |
| tuple: (choices, value) for updating Radio component | |
| """ | |
| available_horizons = get_available_horizons(display_name) | |
| # If current selected horizon is not in available list, select the first available one | |
| current_value = "short" if "short" in available_horizons else (available_horizons[0] if available_horizons else "short") | |
| # Create choices list containing only available horizons | |
| choices = [h for h in ALL_HORIZONS if h in available_horizons] | |
| return gr.Radio(choices=choices, value=current_value) | |
| def update_horizon_checkbox_choices(display_name): | |
| """ | |
| Update horizon CheckboxGroup component's choices and value based on dataset. | |
| Used for Per Dataset tab. | |
| Args: | |
| display_name: Dataset display name from UI dropdown | |
| Returns: | |
| gr.CheckboxGroup: Updated CheckboxGroup component | |
| """ | |
| available_horizons = get_available_horizons(display_name) | |
| # Create choices list containing only available horizons | |
| choices = [h for h in ALL_HORIZONS if h in available_horizons] | |
| # Select all by default | |
| return gr.CheckboxGroup(choices=choices, value=choices) | |
| def update_series_variate_and_window(display_name, horizon): | |
| """ | |
| Update series, variate, and window dropdown options based on dataset display_name and horizon. | |
| Uses Dataset to load actual series and variate names. | |
| Args: | |
| display_name: Dataset display name from UI dropdown (will be resolved to dataset_id) | |
| horizon: Horizon name (short, medium, long) | |
| """ | |
| # Use first available model to get data | |
| model_name = ALL_MODELS[0] | |
| # Find dataset_term (handles display_name -> dataset_id conversion) | |
| results_root = str(RESULTS_ROOT) | |
| dataset_term = find_dataset_term_path(results_root, model_name, display_name) | |
| if dataset_term is None: | |
| print(f"Error: dataset_term is None for display_name={display_name}, horizon={horizon}, model_name={model_name}") | |
| return ( | |
| gr.Dropdown(choices=[], value=None, label="Select Series", interactive=False), | |
| gr.Dropdown(choices=[], value=None, label="Select Variate", interactive=False), | |
| gr.Dropdown(choices=[], value=None, label="Select Testing Window", interactive=False), | |
| ) | |
| # Use cached metadata | |
| metadata = _get_dataset_metadata(dataset_term, horizon) | |
| if metadata is None: | |
| return ( | |
| gr.Dropdown(choices=[], value=None, label="Select Series", interactive=False), | |
| gr.Dropdown(choices=[], value=None, label="Select Variate", interactive=False), | |
| gr.Dropdown(choices=[], value=None, label="Select Testing Window", interactive=False), | |
| ) | |
| windows = [str(i) for i in range(metadata["num_windows"])] | |
| series_list = [str(name) for name in metadata["series_names"]] | |
| # Handle UTS (Univariate Time Series) vs MTS (Multivariate Time Series) | |
| if metadata["is_uts"]: | |
| # UTS mode: each series is a single variate, so variate is always 0 | |
| return ( | |
| gr.Dropdown(choices=series_list, value=series_list[0], label="Select Series", interactive=True), | |
| gr.Dropdown(choices=["0"], value="0", label="Select Variate", interactive=False), | |
| gr.Dropdown(choices=windows, value=windows[0], label="Select Testing Window", interactive=True), | |
| ) | |
| else: | |
| # MTS mode: multiple variates per series | |
| variates_list = [str(name) for name in metadata["variate_names"]] | |
| return ( | |
| gr.Dropdown(choices=series_list, value=series_list[0], label="Select Series", interactive=True), | |
| gr.Dropdown(choices=variates_list, value=variates_list[0], label="Select Variate", interactive=True), | |
| gr.Dropdown(choices=windows, value=windows[0], label="Select Testing Window", interactive=True), | |
| ) | |
| def plot_window_series(display_name, series, variate, window_id, horizon, selected_quantiles, model): | |
| """ | |
| Plot time series predictions for a specific window using Plotly for interactive visualization. | |
| Now includes full time series visualization with test window highlighted. | |
| Accepts series and variate names (strings) and converts them to indices. | |
| Args: | |
| display_name: Dataset display name from UI dropdown (will be resolved to dataset_id) | |
| series: Series name | |
| variate: Variate name | |
| window_id: Window index | |
| horizon: Horizon name | |
| selected_quantiles: List of quantile strings to plot | |
| model: Model name | |
| Returns: | |
| tuple: (fig, info_message) where fig is Plotly figure and info_message contains prediction details | |
| """ | |
| print(f"π plot_window_series called: display_name={display_name}, series={series}, variate={variate}, window_id={window_id}, horizon={horizon}, model={model}") | |
| if display_name is None or series is None or variate is None or window_id is None: | |
| print("β Missing parameters") | |
| fig = go.Figure() | |
| fig.update_layout(title="Please select all parameters") | |
| return fig, "" | |
| results_root = str(RESULTS_ROOT) | |
| print(f"π results_root: {results_root}") | |
| dataset_term = find_dataset_term_path(results_root, model, display_name) | |
| print(f"π dataset_term: {dataset_term}") | |
| if dataset_term is None: | |
| print("β Dataset not found") | |
| fig = go.Figure() | |
| fig.update_layout(title="Dataset not found") | |
| return fig, "" | |
| # --- Cached predictions loading (biggest I/O in Per Test Window) --- | |
| pred_data = _load_predictions_cached(model, dataset_term, horizon) | |
| if pred_data is None: | |
| print(f"β Predictions file not found for {model}/{dataset_term}/{horizon}") | |
| fig = go.Figure() | |
| fig.update_layout(title="Predictions file not found for this horizon") | |
| return fig, "" | |
| predictions_quantiles = pred_data["predictions_quantiles"] # (num_series, num_windows, 9, num_variates, prediction_length) | |
| quantile_levels = pred_data["quantile_levels"] # [0.1, 0.2, ..., 0.9] | |
| # Load prediction scale factor from config.json (for float16 overflow prevention) | |
| model_config_path = os.path.join(results_root, model, dataset_term, horizon, "config.json") | |
| prediction_scale_factor = 1.0 | |
| if os.path.exists(model_config_path): | |
| with open(model_config_path, "r") as f: | |
| model_config = json.load(f) | |
| prediction_scale_factor = model_config.get("prediction_scale_factor", 1.0) | |
| if prediction_scale_factor != 1.0: | |
| print(f"π Applying inverse scale factor: {prediction_scale_factor}") | |
| # Copy to avoid mutating the cached array | |
| predictions_quantiles = predictions_quantiles.astype(np.float32) * prediction_scale_factor | |
| # Use cached metadata for name-to-index mappings and Dataset object | |
| metadata = _get_dataset_metadata(dataset_term, horizon) | |
| if metadata is None: | |
| print("β Failed to load dataset metadata") | |
| fig = go.Figure() | |
| fig.update_layout(title="Failed to load dataset metadata") | |
| return fig, "" | |
| dataset_obj = metadata["dataset_obj"] | |
| dataset_freq = metadata["freq"] | |
| test_length = dataset_obj._test_length | |
| series_name_to_idx = metadata["series_name_to_idx"] | |
| variate_name_to_idx = metadata["variate_name_to_idx"] | |
| print(f"β Dataset loaded from cache: {len(dataset_obj.hf_dataset)} series") | |
| print(f"π Dataset frequency: {dataset_freq}") | |
| # Convert series name to index | |
| series_idx = None | |
| if series in series_name_to_idx: | |
| series_idx = series_name_to_idx[series] | |
| print(f"β Found series '{series}' at index {series_idx}") | |
| else: | |
| series_idx = int(series) | |
| print(f"β οΈ Series '{series}' not found in names, using int index {series_idx}") | |
| # Convert variate name to index | |
| variate_idx = None | |
| if metadata["is_uts"]: | |
| variate_idx = 0 | |
| print(f"βΉοΈ UTS mode, variate_idx=0") | |
| elif variate in variate_name_to_idx: | |
| variate_idx = variate_name_to_idx[variate] | |
| print(f"β Found variate '{variate}' at index {variate_idx}") | |
| else: | |
| variate_idx = int(variate) | |
| print(f"β οΈ Variate '{variate}' not found in names, using int index {variate_idx}") | |
| if series_idx is None: | |
| series_idx = int(series) | |
| if variate_idx is None: | |
| try: | |
| variate_idx = int(variate) if variate is not None else 0 | |
| except (ValueError, TypeError): | |
| variate_idx = 0 | |
| window_idx = int(window_id) | |
| # Get pre-computed quantiles for this specific series, window, and variate | |
| quantiles_data = predictions_quantiles[series_idx, window_idx, :, variate_idx, :] # (9, prediction_length) | |
| prediction_length = quantiles_data.shape[1] | |
| # Create mapping from quantile level string to index | |
| quantile_level_to_idx = {f"{q:.1f}": i for i, q in enumerate(quantile_levels)} | |
| # Load full time series data | |
| full_series = None | |
| train_end_idx = None | |
| test_window_start_idx = None | |
| test_window_end_idx = None | |
| # Get full target time series for this series | |
| print(f"π Getting target for series_idx={series_idx}, variate_idx={variate_idx}") | |
| full_target = dataset_obj.hf_dataset[series_idx]["target"] | |
| print(f"π full_target shape: {full_target.shape}, dtype: {full_target.dtype}") | |
| print(f"π full_target first 10 values (all variates): {full_target[:, :10] if full_target.ndim > 1 else full_target[:10]}") | |
| # Get start timestamp for this series and create timestamp array | |
| series_start = dataset_obj.hf_dataset[series_idx]["start"] | |
| print(f"π Series start timestamp: {series_start}, type: {type(series_start)}") | |
| # Handle numpy array containing datetime64 (common when reading from HF dataset) | |
| if isinstance(series_start, np.ndarray): | |
| # Extract scalar from array | |
| series_start = series_start.item() if series_start.ndim == 0 else series_start[0] | |
| print(f"π Extracted scalar: {series_start}, type: {type(series_start)}") | |
| # Convert numpy datetime64 to pandas Timestamp | |
| if isinstance(series_start, (np.datetime64, str)): | |
| series_start = pd.Timestamp(series_start) | |
| # Calculate series length for timestamp creation | |
| if full_target.ndim > 1: | |
| ts_length = full_target.shape[1] | |
| else: | |
| ts_length = len(full_target) | |
| # Create timestamp array for the entire series | |
| try: | |
| timestamps = pd.date_range(start=series_start, periods=ts_length, freq=dataset_freq) | |
| print(f"π Created timestamp array: {timestamps[0]} to {timestamps[-1]}") | |
| except Exception as e: | |
| print(f"β οΈ Failed to create timestamps: {e}, falling back to indices") | |
| timestamps = None | |
| # Handle multivariate case: extract specific variate | |
| if full_target.ndim > 1: | |
| full_series = full_target[variate_idx, :] # Shape: (series_length,) | |
| else: | |
| full_series = full_target # Shape: (series_length,) | |
| print(f"π full_series shape: {full_series.shape}, min: {full_series.min()}, max: {full_series.max()}, has_nan: {np.isnan(full_series).any()}") | |
| # Calculate train/test split point | |
| # Test data starts at: series_length - test_length | |
| series_length = len(full_series) | |
| train_end_idx = series_length - test_length | |
| # Calculate current test window position | |
| test_window_start_idx = train_end_idx + window_idx * prediction_length | |
| test_window_end_idx = test_window_start_idx + prediction_length | |
| # Create Plotly figure | |
| fig = go.Figure() | |
| # Quantile colors - from light to dark | |
| quantile_colors = { | |
| "0.1": "#c6dbef", "0.9": "#c6dbef", # lightest | |
| "0.2": "#6baed6", "0.8": "#6baed6", # light | |
| "0.3": "#4292c6", "0.7": "#4292c6", # medium | |
| "0.4": "#2171b5", "0.6": "#2171b5", # dark | |
| "0.5": "#08306b", # darkest (median) | |
| } | |
| # Calculate prediction time steps (overlay on the test window) | |
| if test_window_start_idx is not None: | |
| pred_time_steps = np.arange(test_window_start_idx, test_window_end_idx) | |
| else: | |
| pred_time_steps = np.arange(prediction_length) | |
| # Plot full time series if available | |
| time_steps = np.arange(len(full_series)) | |
| # Use timestamps for x-axis if available | |
| if timestamps is not None: | |
| x_full = timestamps | |
| x_pred = timestamps[pred_time_steps] if test_window_start_idx is not None else timestamps[:prediction_length] | |
| x_window = timestamps[test_window_start_idx:test_window_end_idx] if test_window_start_idx is not None else None | |
| else: | |
| x_full = time_steps | |
| x_pred = pred_time_steps | |
| x_window = np.arange(test_window_start_idx, test_window_end_idx) if test_window_start_idx is not None else None | |
| # Plot full series in light gray | |
| fig.add_trace(go.Scatter( | |
| x=x_full, | |
| y=full_series, | |
| mode='lines', | |
| name='Full Time Series', | |
| line=dict(color='gray', width=1), | |
| opacity=0.6, | |
| hovertemplate='Time: %{x}<br>Value: %{y:.4f}<extra></extra>' | |
| )) | |
| # Add shapes for regions (training, test, current window) | |
| if train_end_idx is not None: | |
| # Training region - use timestamps if available | |
| x0_train = timestamps[0] if timestamps is not None else 0 | |
| x1_train = timestamps[train_end_idx] if timestamps is not None else train_end_idx | |
| fig.add_shape( | |
| type="rect", | |
| x0=x0_train, x1=x1_train, | |
| y0=0, y1=1, yref="paper", | |
| fillcolor="blue", opacity=0.1, | |
| layer="below", line_width=0, | |
| ) | |
| # Test region | |
| test_region_end = len(full_series) | |
| x0_test = timestamps[train_end_idx] if timestamps is not None else train_end_idx | |
| x1_test = timestamps[test_region_end-1] if timestamps is not None else test_region_end-1 | |
| fig.add_shape( | |
| type="rect", | |
| x0=x0_test, x1=x1_test, | |
| y0=0, y1=1, yref="paper", | |
| fillcolor="orange", opacity=0.15, | |
| layer="below", line_width=0, | |
| ) | |
| # Highlight current test window | |
| if test_window_start_idx is not None and test_window_end_idx is not None: | |
| # Use timestamps for window highlight if available | |
| x0_window = timestamps[test_window_start_idx] if timestamps is not None else test_window_start_idx | |
| x1_window = timestamps[test_window_end_idx-1] if timestamps is not None else test_window_end_idx-1 | |
| fig.add_shape( | |
| type="rect", | |
| x0=x0_window, x1=x1_window, | |
| y0=0, y1=1, yref="paper", | |
| fillcolor="red", opacity=0.2, | |
| layer="below", line_width=0, | |
| ) | |
| # Plot the test window portion of full series | |
| window_series = full_series[test_window_start_idx:test_window_end_idx] | |
| fig.add_trace(go.Scatter( | |
| x=x_window, | |
| y=window_series, | |
| mode='lines', | |
| name='Ground Truth (Window)', | |
| line=dict(color='red', width=2), | |
| opacity=0.8, | |
| hovertemplate='Time: %{x}<br>Value: %{y:.4f}<extra></extra>' | |
| )) | |
| # Quantile pairs mapping: UI selection -> (low, high) quantile values | |
| quantile_pair_map = { | |
| "0.1-0.9": ("0.1", "0.9"), | |
| "0.2-0.8": ("0.2", "0.8"), | |
| "0.3-0.7": ("0.3", "0.7"), | |
| "0.4-0.6": ("0.4", "0.6"), | |
| } | |
| # Helper function to get pre-computed quantile values | |
| def get_quantile_values(q_str): | |
| return quantiles_data[quantile_level_to_idx[q_str], :] | |
| # Plot quantile pairs with fill (based on paired selection) | |
| for pair_str, (q_low_str, q_high_str) in quantile_pair_map.items(): | |
| if pair_str in selected_quantiles: | |
| quantile_low = get_quantile_values(q_low_str) | |
| quantile_high = get_quantile_values(q_high_str) | |
| color = quantile_colors.get(q_low_str, "#2171b5") | |
| # Add filled area between quantiles | |
| fig.add_trace(go.Scatter( | |
| x=list(x_pred) + list(x_pred[::-1]), | |
| y=list(quantile_high) + list(quantile_low[::-1]), | |
| fill='toself', | |
| fillcolor=color, | |
| line=dict(color='rgba(255,255,255,0)'), | |
| hoverinfo="skip", | |
| showlegend=True, | |
| name=f'Q{q_low_str}-Q{q_high_str}', | |
| opacity=0.3 | |
| )) | |
| # Add lower quantile line | |
| fig.add_trace(go.Scatter( | |
| x=x_pred, | |
| y=quantile_low, | |
| mode='lines', | |
| name=f'Q{q_low_str}', | |
| line=dict(color=color, width=1), | |
| opacity=0.7, | |
| showlegend=False, | |
| hovertemplate=f'Time: %{{x}}<br>Q{q_low_str}: %{{y:.4f}}<extra></extra>' | |
| )) | |
| # Add upper quantile line | |
| fig.add_trace(go.Scatter( | |
| x=x_pred, | |
| y=quantile_high, | |
| mode='lines', | |
| name=f'Q{q_high_str}', | |
| line=dict(color=color, width=1), | |
| opacity=0.7, | |
| showlegend=False, | |
| hovertemplate=f'Time: %{{x}}<br>Q{q_high_str}: %{{y:.4f}}<extra></extra>' | |
| )) | |
| # Plot median (0.5) if selected | |
| if "0.5" in selected_quantiles: | |
| quantile_values = get_quantile_values("0.5") | |
| color = quantile_colors.get("0.5", "#08306b") | |
| fig.add_trace(go.Scatter( | |
| x=x_pred, | |
| y=quantile_values, | |
| mode='lines+markers', | |
| name='Median (Q0.5)', | |
| line=dict(color=color, width=3), | |
| marker=dict(size=5, symbol='circle'), | |
| opacity=0.8, | |
| hovertemplate='Time: %{x}<br>Q0.5: %{y:.4f}<extra></extra>' | |
| )) | |
| # Update layout - use autosize for responsive width | |
| x_axis_title = "Timestamp" if timestamps is not None else "Time Step" | |
| fig.update_layout( | |
| title=None, | |
| xaxis_title=x_axis_title, | |
| yaxis_title="Value", | |
| hovermode='x unified', | |
| autosize=True, # Use automatic width to make chart responsive to container size | |
| height=400, | |
| margin=dict(l=60, r=40, t=60, b=60), # Set reasonable margins | |
| legend=dict( | |
| orientation="h", | |
| yanchor="bottom", | |
| y=1.02, | |
| xanchor="right", | |
| x=1, | |
| font=dict(size=14) | |
| ), | |
| plot_bgcolor='white', | |
| xaxis=dict(showgrid=True, gridcolor='lightgray', gridwidth=1), | |
| yaxis=dict(showgrid=True, gridcolor='lightgray', gridwidth=1) | |
| ) | |
| # Create info message for prediction window | |
| if timestamps is not None and test_window_start_idx is not None and test_window_end_idx is not None: | |
| pred_start_ts = timestamps[test_window_start_idx] | |
| pred_end_ts = timestamps[test_window_end_idx - 1] # -1 because end index is exclusive | |
| # Format with weekday name | |
| start_str = f"{pred_start_ts.strftime('%Y-%m-%d %H:%M:%S')} ({pred_start_ts.day_name()})" | |
| end_str = f"{pred_end_ts.strftime('%Y-%m-%d %H:%M:%S')} ({pred_end_ts.day_name()})" | |
| base_info = ( | |
| f"π Prediction Length: {prediction_length}\n" | |
| f"π Prediction Range: {start_str} β {end_str}\n" | |
| f"π Dataset Frequency: {dataset_freq}" | |
| ) | |
| else: | |
| base_info = ( | |
| f"π Prediction Length: {prediction_length}\n" | |
| f"π Prediction Range: index {test_window_start_idx} β {test_window_end_idx - 1}\n" | |
| f"π Dataset Frequency: {dataset_freq if 'dataset_freq' in dir() else 'N/A'}" | |
| ) | |
| # Get features information for the selected variate | |
| # Pattern names from init_per_pattern_tab | |
| pattern_names = [ | |
| "T_strength", "T_linearity", | |
| "S_strength", "S_corr", | |
| "R_ACF1", | |
| "stationarity", "complexity" | |
| ] | |
| features_info = "" | |
| if not FEATURES_DF.empty and not FEATURES_BOOL_DF.empty: | |
| # Find matching row in features dataframes | |
| # Try to match by dataset_id, series_name, variate_name | |
| feature_row_orig = None | |
| feature_row_bool = None | |
| # Match by dataset_id first | |
| features_subset_orig = FEATURES_DF[FEATURES_DF["dataset_id"] == dataset_term] | |
| features_subset_bool = FEATURES_BOOL_DF[FEATURES_BOOL_DF["dataset_id"] == dataset_term] | |
| print(f"π Features lookup: dataset_term={dataset_term}, series={series}, variate={variate}") | |
| print(f"π Features subset size: orig={len(features_subset_orig)}, bool={len(features_subset_bool)}") | |
| # Try matching by series_name and variate_name (for MTS) | |
| if not features_subset_orig.empty: | |
| # Check if series_name matches | |
| if "series_name" in features_subset_orig.columns: | |
| series_match_orig = features_subset_orig["series_name"] == series | |
| if series_match_orig.any(): | |
| series_matched = features_subset_orig[series_match_orig] | |
| print(f"π Found {len(series_matched)} rows with series_name={series}") | |
| # Check if variate_name matches | |
| if "variate_name" in series_matched.columns: | |
| # For UTS, variate might be "0" or 0, try both | |
| variate_str = str(variate) | |
| variate_match_orig = (series_matched["variate_name"] == variate_str) | (series_matched["variate_name"] == variate) | |
| if variate_match_orig.any(): | |
| feature_row_orig = series_matched[variate_match_orig].iloc[0] | |
| print(f"β Found feature row by series_name + variate_name") | |
| # Find corresponding row in bool dataframe | |
| if not features_subset_bool.empty and "series_name" in features_subset_bool.columns and "variate_name" in features_subset_bool.columns: | |
| series_match_bool = features_subset_bool["series_name"] == series | |
| variate_match_bool = (features_subset_bool["variate_name"] == variate_str) | (features_subset_bool["variate_name"] == variate) | |
| bool_matched = features_subset_bool[series_match_bool & variate_match_bool] | |
| if not bool_matched.empty: | |
| feature_row_bool = bool_matched.iloc[0] | |
| # If not found, try matching by series_name only (for UTS cases where variate_name might not match) | |
| if feature_row_orig is None and not features_subset_orig.empty: | |
| if "series_name" in features_subset_orig.columns: | |
| series_match_orig = features_subset_orig["series_name"] == series | |
| if series_match_orig.any(): | |
| # For UTS, there might be only one row per series | |
| series_matched = features_subset_orig[series_match_orig] | |
| if len(series_matched) == 1: | |
| feature_row_orig = series_matched.iloc[0] | |
| print(f"β Found feature row by series_name only (UTS)") | |
| # Find corresponding row in bool dataframe | |
| if not features_subset_bool.empty and "series_name" in features_subset_bool.columns: | |
| series_match_bool = features_subset_bool["series_name"] == series | |
| bool_matched = features_subset_bool[series_match_bool] | |
| if len(bool_matched) == 1: | |
| feature_row_bool = bool_matched.iloc[0] | |
| # If still not found, try matching by variate_name only (for UTS cases where variate_name == series) | |
| if feature_row_orig is None and not features_subset_orig.empty: | |
| if "variate_name" in features_subset_orig.columns: | |
| variate_match_orig = features_subset_orig["variate_name"] == series # For UTS, series might be the variate_name | |
| if variate_match_orig.any(): | |
| feature_row_orig = features_subset_orig[variate_match_orig].iloc[0] | |
| print(f"β Found feature row by variate_name (series as variate_name)") | |
| # Find corresponding row in bool dataframe | |
| if not features_subset_bool.empty and "variate_name" in features_subset_bool.columns: | |
| variate_match_bool = features_subset_bool["variate_name"] == series | |
| if variate_match_bool.any(): | |
| feature_row_bool = features_subset_bool[variate_match_bool].iloc[0] | |
| if feature_row_orig is None: | |
| print(f"β οΈ Could not find features for dataset_term={dataset_term}, series={series}, variate={variate}") | |
| if not features_subset_orig.empty: | |
| print(f" Available series_names: {features_subset_orig['series_name'].unique()[:10] if 'series_name' in features_subset_orig.columns else 'N/A'}") | |
| print(f" Available variate_names: {features_subset_orig['variate_name'].unique()[:10] if 'variate_name' in features_subset_orig.columns else 'N/A'}") | |
| if feature_row_orig is not None: | |
| # Build features display | |
| features_orig_items = [] | |
| features_bool_items = [] | |
| for pattern_name in pattern_names: | |
| # Map pattern name to feature column name | |
| feature_col = PATTERN_MAP.get(pattern_name, pattern_name) | |
| # Get original value | |
| if feature_col in feature_row_orig.index: | |
| orig_value = feature_row_orig[feature_col] | |
| if pd.notna(orig_value): | |
| features_orig_items.append(f"{pattern_name}: {orig_value:.3f}") | |
| # Get binary value | |
| if feature_row_bool is not None and feature_col in feature_row_bool.index: | |
| bool_value = feature_row_bool[feature_col] | |
| if pd.notna(bool_value): | |
| features_bool_items.append(f"{pattern_name}: {bool_value}") | |
| if features_orig_items or features_bool_items: | |
| features_info = "\n\n π Features of variate:\n" | |
| if features_orig_items: | |
| features_info += "- Original Values: " + ", ".join(features_orig_items) + "\n" | |
| if features_bool_items: | |
| features_info += "- Binary Values (0/1): " + ", ".join(features_bool_items) | |
| info_message = base_info + features_info | |
| print(f"π Info message: {info_message}") | |
| return fig, info_message | |
| def init_overall_tab(): | |
| gr.Markdown( | |
| """ | |
| This tab presents each model's overall performance aggregated across all tasks. A **task** is defined as a specific **(dataset, horizon)** pair. For each task, the result is obtained by averaging the metrics across all its variates. | |
| - **MASE (norm.), CRPS (norm.)**: task-level results are normalized by Seasonal Naive and aggregated by geometric mean. | |
| - **MASE_rank, CRPS_rank**: for each task, models are ranked by the metric; the average rank across all tasks is then reported. | |
| """, | |
| elem_classes="markdown-text" | |
| ) | |
| overall_table = gr.DataFrame( | |
| value=get_overall_leaderboard(DATASETS_DF, metric="MASE"), | |
| elem_classes="custom-table", | |
| interactive=False | |
| ) | |
| # CSV Export | |
| def export_overall_csv(): | |
| df = get_overall_leaderboard(DATASETS_DF, metric="MASE") | |
| return export_dataframe_to_csv(df, filename_prefix="overall_leaderboard") | |
| with gr.Row(): | |
| export_btn = gr.Button("π₯ Export CSV", size="sm") | |
| export_file = gr.File(label="Download CSV", visible=False) | |
| export_btn.click( | |
| fn=export_overall_csv, | |
| inputs=[], | |
| outputs=[export_file] | |
| ).then( | |
| fn=lambda: gr.File(visible=True), | |
| inputs=[], | |
| outputs=[export_file] | |
| ) | |
| def init_per_dataset_tab(demo): | |
| gr.Markdown( | |
| """ | |
| This tab provides flexible analysis at dataset, series, and variate levels. | |
| - **Dataset only**: Shows both Seasonal Naive-normalized metrics (task-level) and original non-normalized metrics, plus average ranks | |
| - **Series/Variate selected**: Shows only original metrics. | |
| - **Horizons**: Select one or more horizons to aggregate results | |
| """, | |
| elem_classes="markdown-text" | |
| ) | |
| # Initialize horizon choices based on first dataset | |
| initial_dataset = DATASET_CHOICES[0] | |
| initial_horizons = get_available_horizons(initial_dataset) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| horizons = gr.CheckboxGroup( | |
| choices=initial_horizons, | |
| value=initial_horizons, | |
| label="Horizons" | |
| ) | |
| dataset_dropdown = gr.Dropdown( | |
| choices=DATASET_CHOICES, | |
| value=initial_dataset, | |
| label="Dataset", | |
| interactive=True | |
| ) | |
| # Initialize series and variate dropdowns | |
| series_dropdown, variate_dropdown = update_series_and_variate( | |
| initial_dataset | |
| ) | |
| msg = gr.Textbox(label="Message", interactive=False) | |
| table = gr.DataFrame(elem_classes="custom-table", interactive=False) | |
| # Update horizons, series, and variate dropdowns when dataset changes | |
| dataset_dropdown.change( | |
| fn=update_horizon_checkbox_choices, | |
| inputs=[dataset_dropdown], | |
| outputs=[horizons], | |
| ).then( | |
| fn=update_series_and_variate, | |
| inputs=[dataset_dropdown], | |
| outputs=[series_dropdown, variate_dropdown], | |
| ).then( | |
| fn=get_dataset_multilevel_leaderboard, | |
| inputs=[dataset_dropdown, series_dropdown, variate_dropdown, horizons], | |
| outputs=[msg, table] | |
| ) | |
| # Update leaderboard when series, variate, or horizons change | |
| for comp in [series_dropdown, variate_dropdown, horizons]: | |
| comp.change( | |
| fn=get_dataset_multilevel_leaderboard, | |
| inputs=[dataset_dropdown, series_dropdown, variate_dropdown, horizons], | |
| outputs=[msg, table] | |
| ) | |
| # Load on startup | |
| demo.load( | |
| fn=get_dataset_multilevel_leaderboard, | |
| inputs=[dataset_dropdown, series_dropdown, variate_dropdown, horizons], | |
| outputs=[msg, table] | |
| ) | |
| # CSV Export | |
| def export_dataset_csv(dataset, series, variate, horizons_val): | |
| _, df = get_dataset_multilevel_leaderboard(dataset, series, variate, horizons_val) | |
| # Sanitize dataset name for filename (replace / with _) | |
| safe_dataset_name = dataset.replace("/", "_") if dataset else "unknown" | |
| return export_dataframe_to_csv(df, filename_prefix=f"dataset_{safe_dataset_name}") | |
| with gr.Row(): | |
| export_btn = gr.Button("π₯ Export CSV", size="sm") | |
| export_file = gr.File(label="Download CSV", visible=False) | |
| export_btn.click( | |
| fn=export_dataset_csv, | |
| inputs=[dataset_dropdown, series_dropdown, variate_dropdown, horizons], | |
| outputs=[export_file] | |
| ).then( | |
| fn=lambda: gr.File(visible=True), | |
| inputs=[], | |
| outputs=[export_file] | |
| ) | |
| def init_per_window_tab(demo): | |
| gr.Markdown( | |
| """ | |
| This tab enables detailed analysis of model performance at the level of individual testing windows. By selecting a dataset, variate, horizon, and test window, users can examine window-level metrics (MASE, CRPS, MAE, MSE) at fine granularity and visualize the predicted quantiles of a model along with the ground-truth. | |
| - **Interactive Visualization**: Zoom, pan, autoscale and download the plot. | |
| - π¦ Train Split π¨ Test Split π₯ Prediction Window | |
| """ | |
| ) | |
| QUANTILE_PAIR_CHOICES = ["0.1-0.9", "0.2-0.8", "0.3-0.7", "0.4-0.6", "0.5"] | |
| initial_quantiles = ["0.5"] | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # Initialize horizon choices based on first dataset | |
| initial_dataset = DATASET_CHOICES[0] if DATASET_CHOICES else None | |
| initial_horizons = get_available_horizons(initial_dataset) if initial_dataset else ALL_HORIZONS | |
| horizons = gr.Radio( | |
| choices=initial_horizons, | |
| value="short" if "short" in initial_horizons else (initial_horizons[0] if initial_horizons else "short"), | |
| label="Horizons" | |
| ) | |
| # Dropdown for dataset selection | |
| dataset_dropdown = gr.Dropdown( | |
| choices=DATASET_CHOICES, | |
| value=DATASET_CHOICES[0] if DATASET_CHOICES else None, # Select first by default | |
| label="Dataset", | |
| interactive=True | |
| ) | |
| # Initialize series, variate, window dropdowns using function | |
| series_dropdown, variate_dropdown, window_dropdown = update_series_variate_and_window( | |
| dataset_dropdown.value, horizons.value | |
| ) | |
| with gr.Column(scale=2): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| quantiles = gr.CheckboxGroup( | |
| choices=QUANTILE_PAIR_CHOICES, | |
| value=initial_quantiles, | |
| label="Select Quantiles for Visualization" | |
| ) | |
| with gr.Column(scale=1): | |
| model = gr.Dropdown( | |
| choices=ALL_MODELS, | |
| value=ALL_MODELS[0], | |
| label="Select Model for Visualization", | |
| interactive=True | |
| ) | |
| ts_visualization = gr.Plot() | |
| # Message box for prediction window info | |
| prediction_info = gr.Textbox( | |
| label="Info", | |
| interactive=False, | |
| lines=3 | |
| ) | |
| table_window = gr.DataFrame(elem_classes="custom-table", interactive=False) | |
| # ββ Shared input / output lists ββββββββββββββββββββββββββββββββββββ | |
| _plot_in = [dataset_dropdown, series_dropdown, variate_dropdown, | |
| window_dropdown, horizons, quantiles, model] | |
| _plot_out = [ts_visualization, prediction_info] | |
| _tbl_in = [dataset_dropdown, series_dropdown, variate_dropdown, | |
| window_dropdown, horizons] | |
| _tbl_out = table_window | |
| # ββ dataset changes βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Chain: update horizons β update dropdowns β refresh plot β refresh table. | |
| # The chain already calls plot & table at the end, so we do NOT bind | |
| # separate .change() on series/variate/window for this trigger path β | |
| # otherwise updating the 3 dropdowns cascades into 3 extra duplicate | |
| # plot_window_series calls (the #1 cause of slowness on HF Space). | |
| dataset_dropdown.change( | |
| fn=update_horizon_choices, | |
| inputs=[dataset_dropdown], | |
| outputs=[horizons], | |
| ).then( | |
| fn=update_series_variate_and_window, | |
| inputs=[dataset_dropdown, horizons], | |
| outputs=[series_dropdown, variate_dropdown, window_dropdown], | |
| ).then( | |
| fn=plot_window_series, inputs=_plot_in, outputs=_plot_out, | |
| ).then( | |
| fn=get_window_leaderboard, inputs=_tbl_in, outputs=_tbl_out, | |
| ) | |
| # ββ horizon changes βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| horizons.change( | |
| fn=update_series_variate_and_window, | |
| inputs=[dataset_dropdown, horizons], | |
| outputs=[series_dropdown, variate_dropdown, window_dropdown], | |
| ).then( | |
| fn=plot_window_series, inputs=_plot_in, outputs=_plot_out, | |
| ).then( | |
| fn=get_window_leaderboard, inputs=_tbl_in, outputs=_tbl_out, | |
| ) | |
| # ββ series / variate / window manual changes ββββββββββββββββββββββββ | |
| # Use a single .then() chain per dropdown so each user-initiated | |
| # change fires plot + table exactly ONCE instead of 2 separate events. | |
| for comp in [series_dropdown, variate_dropdown, window_dropdown]: | |
| comp.change( | |
| fn=plot_window_series, inputs=_plot_in, outputs=_plot_out, | |
| ).then( | |
| fn=get_window_leaderboard, inputs=_tbl_in, outputs=_tbl_out, | |
| ) | |
| # ββ quantiles / model changes βββββββββββββββββββββββββββββββββββββββ | |
| for comp in [quantiles, model]: | |
| comp.change( | |
| fn=plot_window_series, inputs=_plot_in, outputs=_plot_out, | |
| ) | |
| # ββ initial page load βββββββββββββββββββββββββββββββββββββββββββββββ | |
| demo.load( | |
| fn=plot_window_series, inputs=_plot_in, outputs=_plot_out, | |
| ) | |
| demo.load( | |
| fn=get_window_leaderboard, inputs=_tbl_in, outputs=_tbl_out, | |
| ) | |
| # CSV Export | |
| def export_window_csv(dataset, series, variate, window, horizon): | |
| df = get_window_leaderboard(dataset, series, variate, window, horizon) | |
| return export_dataframe_to_csv(df, filename_prefix="window_leaderboard") | |
| with gr.Row(): | |
| export_btn = gr.Button("π₯ Export CSV", size="sm") | |
| export_file = gr.File(label="Download CSV", visible=False) | |
| export_btn.click( | |
| fn=export_window_csv, | |
| inputs=[dataset_dropdown, series_dropdown, variate_dropdown, window_dropdown, horizons], | |
| outputs=[export_file] | |
| ).then( | |
| fn=lambda: gr.File(visible=True), | |
| inputs=[], | |
| outputs=[export_file] | |
| ) | |
| def init_per_pattern_tab(demo): | |
| gr.Markdown( | |
| """ | |
| This tab allows you to explore model performance based on **selected patterns**. | |
| Select patterns to filter variates that exhibit those characteristics, then view aggregated model performance. | |
| Each pattern is a **boolean indicator** derived from time series features (binarized by **median** threshold for continuous features). | |
| - **Patterns are intersected**: A variate must exhibit ALL selected patterns to be included. | |
| - **MASE (norm.), CRPS (norm.)**: variate-level results are normalized by Seasonal Naive and aggregated by geometric mean across all matching variates. | |
| - **MASE (raw), CRPS (raw)**: arithmetic mean across all matching variates. | |
| """, | |
| elem_classes="markdown-text" | |
| ) | |
| # Define pattern choices for Radio components | |
| PATTERN_CHOICES = ["N/A", "=1", "=0"] | |
| with gr.Row(): # TSFeatures | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown("### π Trend Features") | |
| T_strength = gr.Radio( | |
| choices=PATTERN_CHOICES, value="N/A", label="T_strength" | |
| ) | |
| T_linearity = gr.Radio( | |
| choices=PATTERN_CHOICES, value="N/A", label="T_linearity" | |
| ) | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown("### π Seasonal Features") | |
| S_strength = gr.Radio( | |
| choices=PATTERN_CHOICES, value="N/A", label="S_strength" | |
| ) | |
| S_corr = gr.Radio( | |
| choices=PATTERN_CHOICES, value="N/A", label="S_corr" | |
| ) | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown("### π― Residual Features") | |
| R_ACF1 = gr.Radio( | |
| choices=PATTERN_CHOICES, value="N/A", label="R_ACF1" | |
| ) | |
| with gr.Column(scale=1): | |
| with gr.Group(): | |
| gr.Markdown("### βοΈ Global Features") | |
| stationarity = gr.Radio( | |
| choices=PATTERN_CHOICES, value="N/A", label="stationarity" | |
| ) | |
| complexity = gr.Radio( | |
| choices=PATTERN_CHOICES, value="N/A", label="complexity" | |
| ) | |
| # List of all pattern Radio components and their names | |
| pattern_radios = [ | |
| T_strength, T_linearity, | |
| S_strength, S_corr, | |
| R_ACF1, | |
| stationarity, complexity | |
| ] | |
| pattern_names = [ | |
| "T_strength", "T_linearity", | |
| "S_strength", "S_corr", | |
| "R_ACF1", | |
| "stationarity", "complexity" | |
| ] | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| horizons = gr.CheckboxGroup( | |
| choices=ALL_HORIZONS, | |
| value=ALL_HORIZONS, | |
| label="Horizons" | |
| ) | |
| with gr.Column(scale=2): | |
| msg_pattern = gr.Textbox(label="Status", interactive=False, lines=4) | |
| table_variates = gr.DataFrame(elem_classes="custom-table", interactive=False) | |
| def merge_patterns(*radio_values): | |
| """Convert Radio values to pattern filter dict. | |
| Args: | |
| *radio_values: Values from all Radio components in order of pattern_names | |
| Returns: | |
| dict: {feature_name: required_value} where required_value is 0 or 1. | |
| Features with "N/A" are not included in the dict. | |
| """ | |
| result = {} | |
| for name, value in zip(pattern_names, radio_values): | |
| if value == "=1": | |
| result[name] = 1 | |
| elif value == "=0": | |
| result[name] = 0 | |
| # "N/A" -> don't include in dict (no filter on this feature) | |
| return result | |
| def update_leaderboard(*args): | |
| """Callback to update the pattern leaderboard. | |
| Args: | |
| *args: All Radio values followed by horizons (last argument) | |
| """ | |
| # Last argument is horizons, rest are pattern radio values | |
| horizons_val = args[-1] | |
| radio_values = args[:-1] | |
| pattern_filters = merge_patterns(*radio_values) | |
| return get_pattern_leaderboard(pattern_filters, horizons_val) | |
| # Bind change events for all pattern radios and horizons | |
| all_inputs = pattern_radios + [horizons] | |
| for comp in all_inputs: | |
| comp.change( | |
| fn=update_leaderboard, | |
| inputs=all_inputs, | |
| outputs=[msg_pattern, table_variates] | |
| ) | |
| # Load initial state | |
| demo.load( | |
| fn=update_leaderboard, | |
| inputs=all_inputs, | |
| outputs=[msg_pattern, table_variates] | |
| ) | |
| # CSV Export | |
| def export_pattern_csv(*args): | |
| # Last argument is horizons, rest are pattern radio values | |
| horizons_val = args[-1] | |
| radio_values = args[:-1] | |
| pattern_filters = merge_patterns(*radio_values) | |
| _, df = get_pattern_leaderboard(pattern_filters, horizons_val) | |
| return export_dataframe_to_csv(df, filename_prefix="pattern_leaderboard") | |
| with gr.Row(): | |
| export_btn = gr.Button("π₯ Export CSV", size="sm") | |
| export_file = gr.File(label="Download CSV", visible=False) | |
| export_btn.click( | |
| fn=export_pattern_csv, | |
| inputs=all_inputs, | |
| outputs=[export_file] | |
| ).then( | |
| fn=lambda: gr.File(visible=True), | |
| inputs=[], | |
| outputs=[export_file] | |
| ) | |