File size: 7,391 Bytes
9485251 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
import torch
import numpy as np
import pandas as pd
import torch.utils.data as data_utils
from sklearn.preprocessing import MinMaxScaler
from .lstm_model import LstmModel, testing
PRETRAINED_MODEL_N_CHANNELS = 1
PRETRAINED_MODEL_Z_SIZE = 32
class LSTMforOutbreakDetection:
def __init__(
self,
checkpoint_path=None,
n_channels=PRETRAINED_MODEL_N_CHANNELS,
z_size=PRETRAINED_MODEL_Z_SIZE,
device='cpu',
window=7,
batch_size=32,
k=1.5,
percentile=95,
threshold_method=0
):
self.device = torch.device(device)
self.window = window
self.batch_size = batch_size
self.n_channels = n_channels
self.z_size = z_size
self.scaler = MinMaxScaler(feature_range=(0,1))
self.k = k
self.percentile = percentile
self.threshold_method = threshold_method
if checkpoint_path:
self.model = self._load_model(checkpoint_path)
def _load_model(self, checkpoint_path):
model = LstmModel(self.n_channels, self.z_size)
model = model.to(self.device)
model.load_state_dict(torch.load(checkpoint_path, map_location=self.device))
return model
def create_test_sequences(self, dataframe, time_steps, news_or_cases='news'):
if news_or_cases not in ['news', 'cases']:
raise ValueError("news_or_cases should be either 'news' or 'cases'")
output, output2 = [], []
dataframe[[news_or_cases]] = self.scaler.fit_transform(dataframe[[news_or_cases]])
norm = np.array(dataframe[[news_or_cases]]).astype(float)
for i in range(len(norm)):
end_ix = i + time_steps
if end_ix > len(norm)-1:
break
seq_x, seq_y = norm[i:end_ix, :], norm[end_ix, 0]
output.append(seq_x)
output2.append(seq_y)
return np.stack(output), np.stack(output2)
def prepare_input_dataframe(self, dataframe, news_column_name='news'):
X_test, y_test = self.create_test_sequences(dataframe, self.window, news_column_name)
test_loader = torch.utils.data.DataLoader(
data_utils.TensorDataset(
torch.from_numpy(X_test).float(),
torch.from_numpy(y_test).float()
),
batch_size=self.batch_size,
shuffle=False,
num_workers=0
)
return test_loader, y_test
def predict(self, dataframe, news_column_name='news'):
test_loader, y_test = self.prepare_input_dataframe(dataframe, news_column_name)
results, w = testing(self.model, test_loader, self.device)
forecast_test = np.concatenate([
torch.stack(w[:-1]).flatten().detach().cpu().numpy(),
w[-1].flatten().detach().cpu().numpy()
])
test_df = dataframe[self.window:].copy()
test_df['y_test'] = y_test
test_df['pred_forec'] = forecast_test
test_df['abs_loss'] = np.abs(test_df.y_test - test_df.pred_forec)
test_df['rel_loss'] = np.abs((test_df['pred_forec'] - test_df['y_test']) / (1 + test_df['pred_forec']))
test_df['diff'] = test_df['y_test'] - test_df['pred_forec']
return test_df
@staticmethod
def _iqr_rolling(timeseries, k):
q1, q3 = np.percentile(timeseries, [25, 75])
iqr = q3 - q1
return q3 + k * iqr
def windowed_iqr(self, df, k, type_of_loss='diff'):
peaks = {}
for i in range(len(df)):
end_ix = i + self.window
if end_ix > len(df)-1:
break
seq_x = df.iloc[i:end_ix, :]
ub = self._iqr_rolling(seq_x[type_of_loss], k)
for j in seq_x.index:
condition = int(seq_x.loc[j, type_of_loss] > ub)
peaks.setdefault(f'{j}', []).append(condition)
return {k: 1 if sum(v) > 0 else 0 for k, v in peaks.items()}
def get_perc_threshold(self, test_df, percentile, col='abs_loss'):
if col not in ['abs_loss', 'loss']:
raise ValueError("col should be either 'abs_loss' or 'loss'")
test1 = test_df[:-1].copy()
anom_perc_loss = {}
for i in range(len(test_df)):
end_ix = i + self.window
if end_ix > len(test_df)-1:
break
seq_x = test_df.iloc[i:end_ix, :].copy()
mae = seq_x['abs_loss'].values if col == 'abs_loss' else seq_x['y_test'] - seq_x['pred_forec']
threshold = np.percentile(mae, percentile)
seq_x['threshold'] = threshold
for j in seq_x.index:
condition = int(seq_x.loc[j, col] > seq_x.loc[j, 'threshold'])
anom_perc_loss.setdefault(f'{j}', []).append(condition)
final_anom = {k: 1 if sum(v) > 0 else 0 for k, v in anom_perc_loss.items()}
new_col = 'anom_perc_abs_loss' if col == 'abs_loss' else 'anom_perc_diff_gt_pred'
test1[new_col] = pd.Series(final_anom)
return test1
def postprocess_anomalies(self, test_df, new_col, old_col, news_or_cases):
test_df = test_df.copy()
test_df['derivative'] = test_df[news_or_cases].diff().fillna(0)
test_df[new_col] = [0 if v.derivative < 0 and v[old_col] == 1 else v[old_col]
for k, v in test_df.iterrows()]
return test_df
def detect_anomalies(self, test_df, news_or_cases='news'):
"""
Detect anomalies using different methods:
0: IQR on (ground truth - forecast)
1: IQR on |ground truth - forecast|
2: IQR on |ground truth - forecast|/forecast
3: Percentile threshold on absolute loss
4: Percentile threshold on raw loss
input parameters: k (1-3), threshold_method, percentile
"""
test_df = test_df.copy()
test = self.predict(test_df, news_column_name=news_or_cases)
if self.threshold_method in [0, 1, 2]:
loss_type = {0: 'diff', 1: 'abs_loss', 2: 'rel_loss'}[self.threshold_method]
iqr_suffix = {0: 'f_iqr', 1: 'abs_iqr', 2: 'rel_iqr'}[self.threshold_method]
new_label = {0: 'f_new_label', 1: 'abs_new_label', 2: 'rel_new_label'}[self.threshold_method]
peaks = self.windowed_iqr(test, self.k, loss_type)
peak_series = pd.Series(peaks)
peak_series.index = pd.to_datetime(peak_series.index)
test[iqr_suffix] = peak_series
test = self.postprocess_anomalies(test, new_label, iqr_suffix, news_or_cases)
return test, new_label
elif self.threshold_method in [3, 4]:
loss_type = 'abs_loss' if self.threshold_method == 3 else 'loss'
new_label = 'new_anom_absl' if self.threshold_method == 3 else 'new_anom_diff'
old_label = 'anom_perc_abs_loss' if self.threshold_method == 3 else 'anom_perc_diff_gt_pred'
test = self.get_perc_threshold(test, self.percentile, loss_type)
test = self.postprocess_anomalies(test, new_label, old_label, news_or_cases)
return test, new_label
raise ValueError("threshold_method must be between 0 and 4")
|