glucosedao_gpu / utils /darts_evaluation.py
Livia_Zaharia
added code for the first time
bacf16b
import sys
import os
import yaml
import random
from typing import Any, BinaryIO, Callable, Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
from scipy import stats
import pandas as pd
import darts
from darts import models
from darts import metrics
from darts import TimeSeries
# import data formatter
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from data_formatter.base import *
from utils.darts_processing import *
def _get_values(
series: TimeSeries, stochastic_quantile: Optional[float] = 0.5
) -> np.ndarray:
"""
Returns the numpy values of a time series.
For stochastic series, return either all sample values with (stochastic_quantile=None) or the quantile sample value
with (stochastic_quantile {>=0,<=1})
"""
if series.is_deterministic:
series_values = series.univariate_values()
else: # stochastic
if stochastic_quantile is None:
series_values = series.all_values(copy=False)
else:
series_values = series.quantile_timeseries(
quantile=stochastic_quantile
).univariate_values()
return series_values
def _get_values_or_raise(
series_a: TimeSeries,
series_b: TimeSeries,
intersect: bool,
stochastic_quantile: Optional[float] = 0.5,
remove_nan_union: bool = False,
) -> Tuple[np.ndarray, np.ndarray]:
"""Returns the processed numpy values of two time series. Processing can be customized with arguments
`intersect, stochastic_quantile, remove_nan_union`.
Raises a ValueError if the two time series (or their intersection) do not have the same time index.
Parameters
----------
series_a
A univariate deterministic ``TimeSeries`` instance (the actual series).
series_b
A univariate (deterministic or stochastic) ``TimeSeries`` instance (the predicted series).
intersect
A boolean for whether or not to only consider the time intersection between `series_a` and `series_b`
stochastic_quantile
Optionally, for stochastic predicted series, return either all sample values with (`stochastic_quantile=None`)
or any deterministic quantile sample values by setting `stochastic_quantile=quantile` {>=0,<=1}.
remove_nan_union
By setting `remove_non_union` to True, remove all indices from `series_a` and `series_b` which have a NaN value
in either of the two input series.
"""
series_a_common = series_a.slice_intersect(series_b) if intersect else series_a
series_b_common = series_b.slice_intersect(series_a) if intersect else series_b
series_a_det = _get_values(series_a_common, stochastic_quantile=stochastic_quantile)
series_b_det = _get_values(series_b_common, stochastic_quantile=stochastic_quantile)
if not remove_nan_union:
return series_a_det, series_b_det
b_is_deterministic = bool(len(series_b_det.shape) == 1)
if b_is_deterministic:
isnan_mask = np.logical_or(np.isnan(series_a_det), np.isnan(series_b_det))
else:
isnan_mask = np.logical_or(
np.isnan(series_a_det), np.isnan(series_b_det).any(axis=2).flatten()
)
return np.delete(series_a_det, isnan_mask), np.delete(
series_b_det, isnan_mask, axis=0
)
def rescale_and_backtest(series: Union[TimeSeries,
Sequence[TimeSeries]],
forecasts: Union[TimeSeries,
Sequence[TimeSeries],
Sequence[Sequence[TimeSeries]]],
metric: Union[
Callable[[TimeSeries, TimeSeries], float],
List[Callable[[TimeSeries, TimeSeries], float]],
],
scaler: Callable[[TimeSeries], TimeSeries] = None,
reduction: Union[Callable[[np.ndarray], float], None] = np.mean,
likelihood: str = "GaussianMean",
cal_thresholds: Optional[np.ndarray] = np.linspace(0, 1, 11),
):
"""
Backtest the historical forecasts (as provided by Darts) on the series.
Parameters
----------
series
The target time series.
forecasts
The forecasts.
scaler
The scaler used to scale the series.
metric
The metric or metrics to use for backtesting.
reduction
The reduction to apply to the metric.
likelihood
The likelihood to use for evaluating the model.
cal_thresholds
The thresholds to use for computing the calibration error.
Returns
-------
np.ndarray
Error array. If the reduction is none, array is of shape (n, p)
where n is the total number of samples (forecasts) and p is the number of metrics.
If the reduction is not none, array is of shape (k, p), where k is the number of series.
float
The estimated log-likelihood of the model on the data.
np.ndarray
The ECE for each time point in the forecast.
"""
series = [series] if isinstance(series, TimeSeries) else series
forecasts = [forecasts] if isinstance(forecasts, TimeSeries) else forecasts
metric = [metric] if not isinstance(metric, list) else metric
# compute errors: 1) reverse scaling forecasts and true values, 2)compute errors
backtest_list = []
for idx in range(len(series)):
if scaler is not None:
series[idx] = scaler.inverse_transform(series[idx])
forecasts[idx] = [scaler.inverse_transform(f) for f in forecasts[idx]]
errors = [
[metric_f(series[idx], f) for metric_f in metric]
if len(metric) > 1
else metric[0](series[idx], f)
for f in forecasts[idx]
]
if reduction is None:
backtest_list.append(np.array(errors))
else:
backtest_list.append(reduction(np.array(errors), axis=0))
backtest_list = np.vstack(backtest_list)
if likelihood == "GaussianMean":
# compute likelihood
est_var = []
for idx, target_ts in enumerate(series):
est_var += [metrics.mse(target_ts, f) for f in forecasts[idx]]
est_var = np.mean(est_var)
forecast_len = forecasts[0][0].n_timesteps
log_likelihood = -0.5*forecast_len - 0.5*np.log(2*np.pi*est_var)
# compute calibration error: 1) cdf values 2) compute calibration error
# compute the cdf values
cdf_vals = []
for idx in range(len(series)):
for forecast in forecasts[idx]:
y_true, y_pred = _get_values_or_raise(series[idx],
forecast,
intersect=True,
remove_nan_union=True)
y_true, y_pred = y_true.flatten(), y_pred.flatten()
cdf_vals.append(stats.norm.cdf(y_true, loc=y_pred, scale=np.sqrt(est_var)))
cdf_vals = np.vstack(cdf_vals)
# compute the prediction calibration
cal_error = np.zeros(forecasts[0][0].n_timesteps)
for p in cal_thresholds:
est_p = (cdf_vals <= p).astype(float)
est_p = np.mean(est_p, axis=0)
cal_error += (est_p - p) ** 2
return backtest_list, log_likelihood, cal_error
def rescale_and_test(series: Union[TimeSeries,
Sequence[TimeSeries]],
forecasts: Union[TimeSeries,
Sequence[TimeSeries]],
metric: Union[
Callable[[TimeSeries, TimeSeries], float],
List[Callable[[TimeSeries, TimeSeries], float]],
],
scaler: Callable[[TimeSeries], TimeSeries] = None,
likelihood: str = "GaussianMean",
cal_thresholds: Optional[np.ndarray] = np.linspace(0, 1, 11),
):
"""
Test the forecasts on the series.
Parameters
----------
series
The target time series.
forecasts
The forecasts.
scaler
The scaler used to scale the series.
metric
The metric or metrics to use for backtesting.
reduction
The reduction to apply to the metric.
likelihood
The likelihood to use for evaluating the likelihood and calibration of model.
cal_thresholds
The thresholds to use for computing the calibration error.
Returns
-------
np.ndarray
Error array. If the reduction is none, array is of shape (n, p)
where n is the total number of samples (forecasts) and p is the number of metrics.
If the reduction is not none, array is of shape (k, p), where k is the number of series.
float
The estimated log-likelihood of the model on the data.
np.ndarray
The ECE for each time point in the forecast.
"""
series = [series] if isinstance(series, TimeSeries) else series
forecasts = [forecasts] if isinstance(forecasts, TimeSeries) else forecasts
metric = [metric] if not isinstance(metric, list) else metric
# compute errors: 1) reverse scaling forecasts and true values, 2)compute errors
series = scaler.inverse_transform(series)
forecasts = scaler.inverse_transform(forecasts)
errors = [
[metric_f(t, f) for metric_f in metric]
if len(metric) > 1
else metric[0](t, f)
for (t, f) in zip(series, forecasts)
]
errors = np.array(errors)
if likelihood == "GaussianMean":
# compute likelihood
est_var = [metrics.mse(t, f) for (t, f) in zip(series, forecasts)]
est_var = np.mean(est_var)
forecast_len = forecasts[0].n_timesteps
log_likelihood = -0.5*forecast_len - 0.5*np.log(2*np.pi*est_var)
# compute calibration error: 1) cdf values 2) compute calibration error
# compute the cdf values
cdf_vals = []
for t, f in zip(series, forecasts):
t, f = _get_values_or_raise(t, f, intersect=True, remove_nan_union=True)
t, f = t.flatten(), f.flatten()
cdf_vals.append(stats.norm.cdf(t, loc=f, scale=np.sqrt(est_var)))
cdf_vals = np.vstack(cdf_vals)
# compute the prediction calibration
cal_error = np.zeros(forecasts[0].n_timesteps)
for p in cal_thresholds:
est_p = (cdf_vals <= p).astype(float)
est_p = np.mean(est_p, axis=0)
cal_error += (est_p - p) ** 2
if likelihood == "Quantile":
# no likelihood since we don't have a parametric model
log_likelihood = 0
# compute calibration error: 1) get quantiles 2) compute calibration error
cal_error = np.zeros(forecasts[0].n_timesteps)
for p in cal_thresholds:
est_p = 0
for t, f in zip(series, forecasts):
q = f.quantile(p)
t, q = _get_values_or_raise(t, q, intersect=True, remove_nan_union=True)
t, q = t.flatten(), q.flatten()
est_p += (t <= q).astype(float)
est_p = (est_p / len(series)).flatten()
cal_error += (est_p - p) ** 2
return errors, log_likelihood, cal_error