Spaces:
Sleeping
Sleeping
import sys | |
import os | |
import yaml | |
import random | |
from typing import Any, BinaryIO, Callable, Dict, List, Optional, Sequence, Tuple, Union | |
import numpy as np | |
from scipy import stats | |
import pandas as pd | |
import darts | |
from darts import models | |
from darts import metrics | |
from darts import TimeSeries | |
# import data formatter | |
sys.path.append(os.path.join(os.path.dirname(__file__), '..')) | |
from data_formatter.base import * | |
from utils.darts_processing import * | |
def _get_values( | |
series: TimeSeries, stochastic_quantile: Optional[float] = 0.5 | |
) -> np.ndarray: | |
""" | |
Returns the numpy values of a time series. | |
For stochastic series, return either all sample values with (stochastic_quantile=None) or the quantile sample value | |
with (stochastic_quantile {>=0,<=1}) | |
""" | |
if series.is_deterministic: | |
series_values = series.univariate_values() | |
else: # stochastic | |
if stochastic_quantile is None: | |
series_values = series.all_values(copy=False) | |
else: | |
series_values = series.quantile_timeseries( | |
quantile=stochastic_quantile | |
).univariate_values() | |
return series_values | |
def _get_values_or_raise( | |
series_a: TimeSeries, | |
series_b: TimeSeries, | |
intersect: bool, | |
stochastic_quantile: Optional[float] = 0.5, | |
remove_nan_union: bool = False, | |
) -> Tuple[np.ndarray, np.ndarray]: | |
"""Returns the processed numpy values of two time series. Processing can be customized with arguments | |
`intersect, stochastic_quantile, remove_nan_union`. | |
Raises a ValueError if the two time series (or their intersection) do not have the same time index. | |
Parameters | |
---------- | |
series_a | |
A univariate deterministic ``TimeSeries`` instance (the actual series). | |
series_b | |
A univariate (deterministic or stochastic) ``TimeSeries`` instance (the predicted series). | |
intersect | |
A boolean for whether or not to only consider the time intersection between `series_a` and `series_b` | |
stochastic_quantile | |
Optionally, for stochastic predicted series, return either all sample values with (`stochastic_quantile=None`) | |
or any deterministic quantile sample values by setting `stochastic_quantile=quantile` {>=0,<=1}. | |
remove_nan_union | |
By setting `remove_non_union` to True, remove all indices from `series_a` and `series_b` which have a NaN value | |
in either of the two input series. | |
""" | |
series_a_common = series_a.slice_intersect(series_b) if intersect else series_a | |
series_b_common = series_b.slice_intersect(series_a) if intersect else series_b | |
series_a_det = _get_values(series_a_common, stochastic_quantile=stochastic_quantile) | |
series_b_det = _get_values(series_b_common, stochastic_quantile=stochastic_quantile) | |
if not remove_nan_union: | |
return series_a_det, series_b_det | |
b_is_deterministic = bool(len(series_b_det.shape) == 1) | |
if b_is_deterministic: | |
isnan_mask = np.logical_or(np.isnan(series_a_det), np.isnan(series_b_det)) | |
else: | |
isnan_mask = np.logical_or( | |
np.isnan(series_a_det), np.isnan(series_b_det).any(axis=2).flatten() | |
) | |
return np.delete(series_a_det, isnan_mask), np.delete( | |
series_b_det, isnan_mask, axis=0 | |
) | |
def rescale_and_backtest(series: Union[TimeSeries, | |
Sequence[TimeSeries]], | |
forecasts: Union[TimeSeries, | |
Sequence[TimeSeries], | |
Sequence[Sequence[TimeSeries]]], | |
metric: Union[ | |
Callable[[TimeSeries, TimeSeries], float], | |
List[Callable[[TimeSeries, TimeSeries], float]], | |
], | |
scaler: Callable[[TimeSeries], TimeSeries] = None, | |
reduction: Union[Callable[[np.ndarray], float], None] = np.mean, | |
likelihood: str = "GaussianMean", | |
cal_thresholds: Optional[np.ndarray] = np.linspace(0, 1, 11), | |
): | |
""" | |
Backtest the historical forecasts (as provided by Darts) on the series. | |
Parameters | |
---------- | |
series | |
The target time series. | |
forecasts | |
The forecasts. | |
scaler | |
The scaler used to scale the series. | |
metric | |
The metric or metrics to use for backtesting. | |
reduction | |
The reduction to apply to the metric. | |
likelihood | |
The likelihood to use for evaluating the model. | |
cal_thresholds | |
The thresholds to use for computing the calibration error. | |
Returns | |
------- | |
np.ndarray | |
Error array. If the reduction is none, array is of shape (n, p) | |
where n is the total number of samples (forecasts) and p is the number of metrics. | |
If the reduction is not none, array is of shape (k, p), where k is the number of series. | |
float | |
The estimated log-likelihood of the model on the data. | |
np.ndarray | |
The ECE for each time point in the forecast. | |
""" | |
series = [series] if isinstance(series, TimeSeries) else series | |
forecasts = [forecasts] if isinstance(forecasts, TimeSeries) else forecasts | |
metric = [metric] if not isinstance(metric, list) else metric | |
# compute errors: 1) reverse scaling forecasts and true values, 2)compute errors | |
backtest_list = [] | |
for idx in range(len(series)): | |
if scaler is not None: | |
series[idx] = scaler.inverse_transform(series[idx]) | |
forecasts[idx] = [scaler.inverse_transform(f) for f in forecasts[idx]] | |
errors = [ | |
[metric_f(series[idx], f) for metric_f in metric] | |
if len(metric) > 1 | |
else metric[0](series[idx], f) | |
for f in forecasts[idx] | |
] | |
if reduction is None: | |
backtest_list.append(np.array(errors)) | |
else: | |
backtest_list.append(reduction(np.array(errors), axis=0)) | |
backtest_list = np.vstack(backtest_list) | |
if likelihood == "GaussianMean": | |
# compute likelihood | |
est_var = [] | |
for idx, target_ts in enumerate(series): | |
est_var += [metrics.mse(target_ts, f) for f in forecasts[idx]] | |
est_var = np.mean(est_var) | |
forecast_len = forecasts[0][0].n_timesteps | |
log_likelihood = -0.5*forecast_len - 0.5*np.log(2*np.pi*est_var) | |
# compute calibration error: 1) cdf values 2) compute calibration error | |
# compute the cdf values | |
cdf_vals = [] | |
for idx in range(len(series)): | |
for forecast in forecasts[idx]: | |
y_true, y_pred = _get_values_or_raise(series[idx], | |
forecast, | |
intersect=True, | |
remove_nan_union=True) | |
y_true, y_pred = y_true.flatten(), y_pred.flatten() | |
cdf_vals.append(stats.norm.cdf(y_true, loc=y_pred, scale=np.sqrt(est_var))) | |
cdf_vals = np.vstack(cdf_vals) | |
# compute the prediction calibration | |
cal_error = np.zeros(forecasts[0][0].n_timesteps) | |
for p in cal_thresholds: | |
est_p = (cdf_vals <= p).astype(float) | |
est_p = np.mean(est_p, axis=0) | |
cal_error += (est_p - p) ** 2 | |
return backtest_list, log_likelihood, cal_error | |
def rescale_and_test(series: Union[TimeSeries, | |
Sequence[TimeSeries]], | |
forecasts: Union[TimeSeries, | |
Sequence[TimeSeries]], | |
metric: Union[ | |
Callable[[TimeSeries, TimeSeries], float], | |
List[Callable[[TimeSeries, TimeSeries], float]], | |
], | |
scaler: Callable[[TimeSeries], TimeSeries] = None, | |
likelihood: str = "GaussianMean", | |
cal_thresholds: Optional[np.ndarray] = np.linspace(0, 1, 11), | |
): | |
""" | |
Test the forecasts on the series. | |
Parameters | |
---------- | |
series | |
The target time series. | |
forecasts | |
The forecasts. | |
scaler | |
The scaler used to scale the series. | |
metric | |
The metric or metrics to use for backtesting. | |
reduction | |
The reduction to apply to the metric. | |
likelihood | |
The likelihood to use for evaluating the likelihood and calibration of model. | |
cal_thresholds | |
The thresholds to use for computing the calibration error. | |
Returns | |
------- | |
np.ndarray | |
Error array. If the reduction is none, array is of shape (n, p) | |
where n is the total number of samples (forecasts) and p is the number of metrics. | |
If the reduction is not none, array is of shape (k, p), where k is the number of series. | |
float | |
The estimated log-likelihood of the model on the data. | |
np.ndarray | |
The ECE for each time point in the forecast. | |
""" | |
series = [series] if isinstance(series, TimeSeries) else series | |
forecasts = [forecasts] if isinstance(forecasts, TimeSeries) else forecasts | |
metric = [metric] if not isinstance(metric, list) else metric | |
# compute errors: 1) reverse scaling forecasts and true values, 2)compute errors | |
series = scaler.inverse_transform(series) | |
forecasts = scaler.inverse_transform(forecasts) | |
errors = [ | |
[metric_f(t, f) for metric_f in metric] | |
if len(metric) > 1 | |
else metric[0](t, f) | |
for (t, f) in zip(series, forecasts) | |
] | |
errors = np.array(errors) | |
if likelihood == "GaussianMean": | |
# compute likelihood | |
est_var = [metrics.mse(t, f) for (t, f) in zip(series, forecasts)] | |
est_var = np.mean(est_var) | |
forecast_len = forecasts[0].n_timesteps | |
log_likelihood = -0.5*forecast_len - 0.5*np.log(2*np.pi*est_var) | |
# compute calibration error: 1) cdf values 2) compute calibration error | |
# compute the cdf values | |
cdf_vals = [] | |
for t, f in zip(series, forecasts): | |
t, f = _get_values_or_raise(t, f, intersect=True, remove_nan_union=True) | |
t, f = t.flatten(), f.flatten() | |
cdf_vals.append(stats.norm.cdf(t, loc=f, scale=np.sqrt(est_var))) | |
cdf_vals = np.vstack(cdf_vals) | |
# compute the prediction calibration | |
cal_error = np.zeros(forecasts[0].n_timesteps) | |
for p in cal_thresholds: | |
est_p = (cdf_vals <= p).astype(float) | |
est_p = np.mean(est_p, axis=0) | |
cal_error += (est_p - p) ** 2 | |
if likelihood == "Quantile": | |
# no likelihood since we don't have a parametric model | |
log_likelihood = 0 | |
# compute calibration error: 1) get quantiles 2) compute calibration error | |
cal_error = np.zeros(forecasts[0].n_timesteps) | |
for p in cal_thresholds: | |
est_p = 0 | |
for t, f in zip(series, forecasts): | |
q = f.quantile(p) | |
t, q = _get_values_or_raise(t, q, intersect=True, remove_nan_union=True) | |
t, q = t.flatten(), q.flatten() | |
est_p += (t <= q).astype(float) | |
est_p = (est_p / len(series)).flatten() | |
cal_error += (est_p - p) ** 2 | |
return errors, log_likelihood, cal_error |