|
import pandas as pd |
|
import numpy as np |
|
from statsmodels.tsa.stattools import adfuller, acf, pacf |
|
from statsmodels.tsa.arima.model import ARIMA |
|
|
|
|
|
NEW_ANOMALY_COLUMN_NAME = 'anomaly' |
|
|
|
class ARIMAforOutbreakDetection: |
|
def __init__(self, window_size=7, stride=1, k=1.5, significance=0.05, max_lag=30): |
|
self.window_size = window_size |
|
self.stride = stride |
|
self.k = k |
|
self.significance = significance |
|
self.max_lag = max_lag |
|
|
|
def test_stationarity(self, ts_data, column=''): |
|
if isinstance(ts_data, pd.Series): |
|
adf_test = adfuller(ts_data, autolag='AIC') |
|
else: |
|
adf_test = adfuller(ts_data[column], autolag='AIC') |
|
return "Stationary" if adf_test[1] <= self.significance else "Non-Stationary" |
|
|
|
def make_stationary(self, dataframe, column): |
|
df_to_return = None |
|
result = self.test_stationarity(dataframe, column) |
|
|
|
if result == "Stationary": |
|
return dataframe |
|
|
|
diff_series = dataframe.copy() |
|
for diff_count in range(5): |
|
diff_series = diff_series.diff().fillna(0) |
|
if self.test_stationarity(diff_series, column) == "Stationary": |
|
return diff_series |
|
return diff_series |
|
|
|
def create_windows(self, df): |
|
windows, gts = [], [] |
|
for i in range(0, len(df) - self.window_size, self.stride): |
|
end_id = i + self.window_size |
|
windows.append(df.iloc[i:end_id, :]) |
|
gts.append(df.iloc[end_id, :]) |
|
return np.stack(windows), np.stack(gts) |
|
|
|
def find_p_q(self, series): |
|
N = len(series) |
|
acf_values, _ = acf(series, nlags=self.max_lag, alpha=self.significance, fft=False) |
|
pacf_values, _ = pacf(series, nlags=self.max_lag, alpha=self.significance) |
|
threshold = 1.96 / np.sqrt(N) |
|
|
|
def find_last_consecutive_outlier(values): |
|
for i in range(1, len(values)): |
|
if values[i] < 0 or (values[i] > 0 and abs(values[i]) < threshold): |
|
return i |
|
return len(values) - 1 |
|
|
|
return find_last_consecutive_outlier(pacf_values), find_last_consecutive_outlier(acf_values) |
|
|
|
def detect_anomalies(self, dataset, news_or_cases='news'): |
|
stationary_data = self.make_stationary(dataset, news_or_cases) |
|
p, q = self.find_p_q(stationary_data[news_or_cases]) |
|
anomalies, means, stdevs, residuals, predictions, gts = self._train_arima_model(stationary_data, p, q) |
|
result_df = self._prepare_resulting_dataframe( |
|
residuals, means, stdevs, dataset.iloc[self.window_size:], |
|
anomalies, gts, predictions |
|
) |
|
return self._postprocess_anomalies(result_df, news_or_cases), NEW_ANOMALY_COLUMN_NAME |
|
|
|
def _train_arima_model(self, dataset, p, q): |
|
predictions, residuals, means, stdevs, anomalies = [], [], [], [], [] |
|
windows, gts = self.create_windows(dataset) |
|
|
|
for window, gt in zip(windows, gts): |
|
model = ARIMA(window, order=(p, 0, q)) |
|
model.initialize_approximate_diffuse() |
|
fit = model.fit() |
|
|
|
pred = fit.forecast(steps=1)[0] |
|
residual = np.abs(gt - pred) |
|
mu, std = np.mean(fit.resid), np.std(fit.resid) |
|
|
|
anomalies.append( |
|
1 if residual > mu + self.k * std or residual < mu - self.k * std else 0 |
|
) |
|
|
|
means.append(mu) |
|
stdevs.append(std) |
|
residuals.append(residual) |
|
predictions.append(pred) |
|
|
|
return anomalies, means, stdevs, residuals, predictions, gts |
|
|
|
def _prepare_resulting_dataframe(self, residuals, means, stdevs, original_dataset, |
|
anomalies, gts, predictions): |
|
result_df = original_dataset.copy() |
|
result_df['residuals'] = residuals |
|
result_df['mu'] = means |
|
result_df['sigma'] = stdevs |
|
result_df['anomaly'] = anomalies |
|
result_df['gts_diff'] = gts |
|
result_df['pred_diff'] = predictions |
|
return result_df |
|
|
|
def _postprocess_anomalies(self, dataframe, col_name='news'): |
|
dataframe['derivative'] = dataframe[col_name].diff().fillna(0) |
|
dataframe['new_anomaly'] = [ |
|
0 if row.derivative < 0 and row.anomaly == 1 else row.anomaly |
|
for _, row in dataframe.iterrows() |
|
] |
|
return dataframe |