|
import pandas as pd |
|
import numpy as np |
|
|
|
|
|
NEW_LABEL_COLUMN_NAME = 'new_label' |
|
|
|
class IQRforOutbreakDetection: |
|
def __init__(self, window_size=7, stride=1, k=1.5): |
|
self.window_size = window_size |
|
self.stride = stride |
|
self.k = k |
|
|
|
def _iqr_rolling(self, timeseries): |
|
q1 = np.percentile(timeseries, 25) |
|
q3 = np.percentile(timeseries, 75) |
|
iqr = q3 - q1 |
|
ub = q3 + self.k * iqr |
|
lb = q1 - self.k * iqr |
|
return ub, lb |
|
|
|
def detect_anomalies(self, df, news_or_cases='news'): |
|
"""" |
|
input methods: k |
|
""" |
|
if isinstance(df, pd.Series): |
|
timeseries = df |
|
else: |
|
timeseries = df[news_or_cases] |
|
|
|
tot_peaks, final_peaks, _ = self._windowed_iqr(timeseries) |
|
result_df = self._prepare_resulting_dataframe(final_peaks, timeseries) |
|
processed_df = self._postprocess_anomalies(result_df, news_or_cases) |
|
print(processed_df) |
|
|
|
return processed_df, NEW_LABEL_COLUMN_NAME |
|
|
|
def _windowed_iqr(self, df): |
|
tot_peaks = {} |
|
for i in range(0, len(df) - self.window_size + 1, self.stride): |
|
end_id = i + self.window_size |
|
window = df[i:end_id] |
|
ub, _ = self._iqr_rolling(window) |
|
|
|
for j in window.index: |
|
peaks_list = tot_peaks.setdefault(f'{j}', []) |
|
peaks_list.append(window.loc[j] > ub) |
|
|
|
final_peaks = {k: True if True in v else False |
|
for k, v in tot_peaks.items()} |
|
|
|
return tot_peaks, final_peaks, end_id |
|
|
|
def _prepare_resulting_dataframe(self, peaks_df, news_or_cases_df): |
|
final_df_iqr = pd.DataFrame.from_dict(peaks_df, orient='index') |
|
dff = pd.DataFrame(news_or_cases_df) |
|
dff['peaks'] = final_df_iqr.loc[:, 0].values |
|
dff['peaks'] = dff['peaks'].map({True: 1, False: 0}) |
|
return dff |
|
|
|
def _postprocess_anomalies(self, dataframe, col_name='news'): |
|
dataframe['derivative'] = dataframe[col_name].diff().fillna(0) |
|
dataframe['new_label'] = [0 if v.derivative < 0 and v.peaks == 1 else v.peaks |
|
for _, v in dataframe.iterrows()] |
|
return dataframe |