|
import torch |
|
import numpy as np |
|
import pandas as pd |
|
import torch.utils.data as data_utils |
|
from sklearn.preprocessing import MinMaxScaler |
|
from .lstm_model import LstmModel, testing |
|
|
|
|
|
PRETRAINED_MODEL_N_CHANNELS = 1 |
|
PRETRAINED_MODEL_Z_SIZE = 32 |
|
|
|
class LSTMforOutbreakDetection: |
|
def __init__( |
|
self, |
|
checkpoint_path=None, |
|
n_channels=PRETRAINED_MODEL_N_CHANNELS, |
|
z_size=PRETRAINED_MODEL_Z_SIZE, |
|
device='cpu', |
|
window=7, |
|
batch_size=32, |
|
k=1.5, |
|
percentile=95, |
|
threshold_method=0 |
|
): |
|
self.device = torch.device(device) |
|
self.window = window |
|
self.batch_size = batch_size |
|
self.n_channels = n_channels |
|
self.z_size = z_size |
|
self.scaler = MinMaxScaler(feature_range=(0,1)) |
|
self.k = k |
|
self.percentile = percentile |
|
self.threshold_method = threshold_method |
|
if checkpoint_path: |
|
self.model = self._load_model(checkpoint_path) |
|
|
|
def _load_model(self, checkpoint_path): |
|
model = LstmModel(self.n_channels, self.z_size) |
|
model = model.to(self.device) |
|
model.load_state_dict(torch.load(checkpoint_path, map_location=self.device)) |
|
return model |
|
|
|
def create_test_sequences(self, dataframe, time_steps, news_or_cases='news'): |
|
if news_or_cases not in ['news', 'cases']: |
|
raise ValueError("news_or_cases should be either 'news' or 'cases'") |
|
|
|
output, output2 = [], [] |
|
dataframe[[news_or_cases]] = self.scaler.fit_transform(dataframe[[news_or_cases]]) |
|
norm = np.array(dataframe[[news_or_cases]]).astype(float) |
|
|
|
for i in range(len(norm)): |
|
end_ix = i + time_steps |
|
|
|
if end_ix > len(norm)-1: |
|
break |
|
|
|
seq_x, seq_y = norm[i:end_ix, :], norm[end_ix, 0] |
|
output.append(seq_x) |
|
output2.append(seq_y) |
|
|
|
return np.stack(output), np.stack(output2) |
|
|
|
def prepare_input_dataframe(self, dataframe, news_column_name='news'): |
|
X_test, y_test = self.create_test_sequences(dataframe, self.window, news_column_name) |
|
test_loader = torch.utils.data.DataLoader( |
|
data_utils.TensorDataset( |
|
torch.from_numpy(X_test).float(), |
|
torch.from_numpy(y_test).float() |
|
), |
|
batch_size=self.batch_size, |
|
shuffle=False, |
|
num_workers=0 |
|
) |
|
return test_loader, y_test |
|
|
|
def predict(self, dataframe, news_column_name='news'): |
|
test_loader, y_test = self.prepare_input_dataframe(dataframe, news_column_name) |
|
results, w = testing(self.model, test_loader, self.device) |
|
forecast_test = np.concatenate([ |
|
torch.stack(w[:-1]).flatten().detach().cpu().numpy(), |
|
w[-1].flatten().detach().cpu().numpy() |
|
]) |
|
|
|
test_df = dataframe[self.window:].copy() |
|
test_df['y_test'] = y_test |
|
test_df['pred_forec'] = forecast_test |
|
test_df['abs_loss'] = np.abs(test_df.y_test - test_df.pred_forec) |
|
test_df['rel_loss'] = np.abs((test_df['pred_forec'] - test_df['y_test']) / (1 + test_df['pred_forec'])) |
|
test_df['diff'] = test_df['y_test'] - test_df['pred_forec'] |
|
|
|
return test_df |
|
|
|
@staticmethod |
|
def _iqr_rolling(timeseries, k): |
|
q1, q3 = np.percentile(timeseries, [25, 75]) |
|
iqr = q3 - q1 |
|
|
|
return q3 + k * iqr |
|
|
|
def windowed_iqr(self, df, k, type_of_loss='diff'): |
|
peaks = {} |
|
|
|
for i in range(len(df)): |
|
end_ix = i + self.window |
|
|
|
if end_ix > len(df)-1: |
|
break |
|
|
|
seq_x = df.iloc[i:end_ix, :] |
|
ub = self._iqr_rolling(seq_x[type_of_loss], k) |
|
|
|
for j in seq_x.index: |
|
condition = int(seq_x.loc[j, type_of_loss] > ub) |
|
peaks.setdefault(f'{j}', []).append(condition) |
|
|
|
return {k: 1 if sum(v) > 0 else 0 for k, v in peaks.items()} |
|
|
|
def get_perc_threshold(self, test_df, percentile, col='abs_loss'): |
|
if col not in ['abs_loss', 'loss']: |
|
raise ValueError("col should be either 'abs_loss' or 'loss'") |
|
|
|
test1 = test_df[:-1].copy() |
|
anom_perc_loss = {} |
|
|
|
for i in range(len(test_df)): |
|
end_ix = i + self.window |
|
if end_ix > len(test_df)-1: |
|
break |
|
|
|
seq_x = test_df.iloc[i:end_ix, :].copy() |
|
mae = seq_x['abs_loss'].values if col == 'abs_loss' else seq_x['y_test'] - seq_x['pred_forec'] |
|
|
|
threshold = np.percentile(mae, percentile) |
|
seq_x['threshold'] = threshold |
|
|
|
for j in seq_x.index: |
|
condition = int(seq_x.loc[j, col] > seq_x.loc[j, 'threshold']) |
|
anom_perc_loss.setdefault(f'{j}', []).append(condition) |
|
|
|
final_anom = {k: 1 if sum(v) > 0 else 0 for k, v in anom_perc_loss.items()} |
|
new_col = 'anom_perc_abs_loss' if col == 'abs_loss' else 'anom_perc_diff_gt_pred' |
|
test1[new_col] = pd.Series(final_anom) |
|
|
|
return test1 |
|
|
|
def postprocess_anomalies(self, test_df, new_col, old_col, news_or_cases): |
|
test_df = test_df.copy() |
|
test_df['derivative'] = test_df[news_or_cases].diff().fillna(0) |
|
test_df[new_col] = [0 if v.derivative < 0 and v[old_col] == 1 else v[old_col] |
|
for k, v in test_df.iterrows()] |
|
|
|
return test_df |
|
|
|
def detect_anomalies(self, test_df, news_or_cases='news'): |
|
""" |
|
Detect anomalies using different methods: |
|
0: IQR on (ground truth - forecast) |
|
1: IQR on |ground truth - forecast| |
|
2: IQR on |ground truth - forecast|/forecast |
|
3: Percentile threshold on absolute loss |
|
4: Percentile threshold on raw loss |
|
|
|
input parameters: k (1-3), threshold_method, percentile |
|
""" |
|
test_df = test_df.copy() |
|
|
|
test = self.predict(test_df, news_column_name=news_or_cases) |
|
|
|
if self.threshold_method in [0, 1, 2]: |
|
loss_type = {0: 'diff', 1: 'abs_loss', 2: 'rel_loss'}[self.threshold_method] |
|
iqr_suffix = {0: 'f_iqr', 1: 'abs_iqr', 2: 'rel_iqr'}[self.threshold_method] |
|
new_label = {0: 'f_new_label', 1: 'abs_new_label', 2: 'rel_new_label'}[self.threshold_method] |
|
|
|
peaks = self.windowed_iqr(test, self.k, loss_type) |
|
peak_series = pd.Series(peaks) |
|
peak_series.index = pd.to_datetime(peak_series.index) |
|
test[iqr_suffix] = peak_series |
|
test = self.postprocess_anomalies(test, new_label, iqr_suffix, news_or_cases) |
|
return test, new_label |
|
|
|
elif self.threshold_method in [3, 4]: |
|
loss_type = 'abs_loss' if self.threshold_method == 3 else 'loss' |
|
new_label = 'new_anom_absl' if self.threshold_method == 3 else 'new_anom_diff' |
|
old_label = 'anom_perc_abs_loss' if self.threshold_method == 3 else 'anom_perc_diff_gt_pred' |
|
|
|
test = self.get_perc_threshold(test, self.percentile, loss_type) |
|
test = self.postprocess_anomalies(test, new_label, old_label, news_or_cases) |
|
return test, new_label |
|
|
|
raise ValueError("threshold_method must be between 0 and 4") |
|
|
|
|