|
|
""" |
|
|
Forecast evaluation metrics |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from typing import Dict, Any |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def calculate_metrics( |
|
|
actual: pd.Series, |
|
|
forecast: pd.Series, |
|
|
include_percentage: bool = True |
|
|
) -> Dict[str, float]: |
|
|
""" |
|
|
Calculate forecast accuracy metrics |
|
|
|
|
|
Args: |
|
|
actual: Actual values |
|
|
forecast: Forecasted values |
|
|
include_percentage: Include percentage-based metrics |
|
|
|
|
|
Returns: |
|
|
Dictionary of metrics |
|
|
""" |
|
|
try: |
|
|
|
|
|
min_len = min(len(actual), len(forecast)) |
|
|
actual = actual.iloc[:min_len].values |
|
|
forecast = forecast.iloc[:min_len].values |
|
|
|
|
|
|
|
|
mask = ~(np.isnan(actual) | np.isnan(forecast)) |
|
|
actual = actual[mask] |
|
|
forecast = forecast[mask] |
|
|
|
|
|
if len(actual) == 0: |
|
|
return {'error': 'No valid values for metric calculation'} |
|
|
|
|
|
metrics = {} |
|
|
|
|
|
|
|
|
metrics['MAE'] = float(np.mean(np.abs(actual - forecast))) |
|
|
|
|
|
|
|
|
metrics['RMSE'] = float(np.sqrt(np.mean((actual - forecast) ** 2))) |
|
|
|
|
|
|
|
|
metrics['ME'] = float(np.mean(forecast - actual)) |
|
|
|
|
|
if include_percentage: |
|
|
|
|
|
|
|
|
mask_nonzero = actual != 0 |
|
|
if mask_nonzero.any(): |
|
|
mape = np.mean(np.abs((actual[mask_nonzero] - forecast[mask_nonzero]) / actual[mask_nonzero])) * 100 |
|
|
metrics['MAPE'] = float(mape) |
|
|
|
|
|
|
|
|
denominator = (np.abs(actual) + np.abs(forecast)) / 2 |
|
|
mask_nonzero = denominator != 0 |
|
|
if mask_nonzero.any(): |
|
|
smape = np.mean(np.abs(actual[mask_nonzero] - forecast[mask_nonzero]) / denominator[mask_nonzero]) * 100 |
|
|
metrics['sMAPE'] = float(smape) |
|
|
|
|
|
|
|
|
ss_res = np.sum((actual - forecast) ** 2) |
|
|
ss_tot = np.sum((actual - np.mean(actual)) ** 2) |
|
|
if ss_tot != 0: |
|
|
metrics['R2'] = float(1 - (ss_res / ss_tot)) |
|
|
|
|
|
return metrics |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error calculating metrics: {str(e)}", exc_info=True) |
|
|
return {'error': str(e)} |
|
|
|
|
|
|
|
|
def calculate_coverage( |
|
|
actual: pd.Series, |
|
|
lower_bound: pd.Series, |
|
|
upper_bound: pd.Series |
|
|
) -> float: |
|
|
""" |
|
|
Calculate coverage of prediction intervals |
|
|
|
|
|
Args: |
|
|
actual: Actual values |
|
|
lower_bound: Lower bound of prediction interval |
|
|
upper_bound: Upper bound of prediction interval |
|
|
|
|
|
Returns: |
|
|
Coverage percentage (0-100) |
|
|
""" |
|
|
try: |
|
|
|
|
|
min_len = min(len(actual), len(lower_bound), len(upper_bound)) |
|
|
actual = actual.iloc[:min_len].values |
|
|
lower_bound = lower_bound.iloc[:min_len].values |
|
|
upper_bound = upper_bound.iloc[:min_len].values |
|
|
|
|
|
|
|
|
within_bounds = (actual >= lower_bound) & (actual <= upper_bound) |
|
|
coverage = np.mean(within_bounds) * 100 |
|
|
|
|
|
return float(coverage) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error calculating coverage: {str(e)}", exc_info=True) |
|
|
return 0.0 |
|
|
|
|
|
|
|
|
def calculate_interval_width( |
|
|
lower_bound: pd.Series, |
|
|
upper_bound: pd.Series |
|
|
) -> Dict[str, float]: |
|
|
""" |
|
|
Calculate statistics about prediction interval width |
|
|
|
|
|
Args: |
|
|
lower_bound: Lower bound of prediction interval |
|
|
upper_bound: Upper bound of prediction interval |
|
|
|
|
|
Returns: |
|
|
Dictionary with width statistics |
|
|
""" |
|
|
try: |
|
|
widths = upper_bound - lower_bound |
|
|
|
|
|
return { |
|
|
'mean_width': float(widths.mean()), |
|
|
'median_width': float(widths.median()), |
|
|
'min_width': float(widths.min()), |
|
|
'max_width': float(widths.max()), |
|
|
'std_width': float(widths.std()) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error calculating interval width: {str(e)}", exc_info=True) |
|
|
return {} |
|
|
|
|
|
|
|
|
def format_metric(value: float, metric_name: str) -> str: |
|
|
""" |
|
|
Format metric value for display |
|
|
|
|
|
Args: |
|
|
value: Metric value |
|
|
metric_name: Name of the metric |
|
|
|
|
|
Returns: |
|
|
Formatted string |
|
|
""" |
|
|
if metric_name in ['MAPE', 'sMAPE', 'R2']: |
|
|
return f"{value:.2f}%" |
|
|
elif metric_name in ['MAE', 'RMSE', 'ME']: |
|
|
if abs(value) >= 1000: |
|
|
return f"{value:,.2f}" |
|
|
else: |
|
|
return f"{value:.4f}" |
|
|
else: |
|
|
return f"{value:.4f}" |
|
|
|
|
|
|
|
|
def summarize_forecast_quality( |
|
|
forecast_df: pd.DataFrame, |
|
|
confidence_levels: list |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Summarize the quality of a forecast |
|
|
|
|
|
Args: |
|
|
forecast_df: DataFrame with forecast results |
|
|
confidence_levels: List of confidence levels |
|
|
|
|
|
Returns: |
|
|
Summary dictionary |
|
|
""" |
|
|
try: |
|
|
summary = { |
|
|
'horizon': len(forecast_df), |
|
|
'forecast_range': { |
|
|
'min': float(forecast_df['forecast'].min()), |
|
|
'max': float(forecast_df['forecast'].max()), |
|
|
'mean': float(forecast_df['forecast'].mean()) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
interval_widths = {} |
|
|
for cl in confidence_levels: |
|
|
lower_col = f'lower_{cl}' |
|
|
upper_col = f'upper_{cl}' |
|
|
|
|
|
if lower_col in forecast_df.columns and upper_col in forecast_df.columns: |
|
|
width = (forecast_df[upper_col] - forecast_df[lower_col]).mean() |
|
|
interval_widths[f'{cl}%'] = float(width) |
|
|
|
|
|
summary['interval_widths'] = interval_widths |
|
|
|
|
|
return summary |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error summarizing forecast: {str(e)}", exc_info=True) |
|
|
return {} |
|
|
|