JuanJoseMV's picture
add methods for each strategy
9485251
import pandas as pd
import numpy as np
NEW_LABEL_COLUMN_NAME = 'new_label'
class IQRforOutbreakDetection:
def __init__(self, window_size=7, stride=1, k=1.5):
self.window_size = window_size
self.stride = stride
self.k = k
def _iqr_rolling(self, timeseries):
q1 = np.percentile(timeseries, 25)
q3 = np.percentile(timeseries, 75)
iqr = q3 - q1
ub = q3 + self.k * iqr
lb = q1 - self.k * iqr
return ub, lb
def detect_anomalies(self, df, news_or_cases='news'):
""""
input methods: k
"""
if isinstance(df, pd.Series):
timeseries = df
else:
timeseries = df[news_or_cases]
tot_peaks, final_peaks, _ = self._windowed_iqr(timeseries)
result_df = self._prepare_resulting_dataframe(final_peaks, timeseries)
processed_df = self._postprocess_anomalies(result_df, news_or_cases)
print(processed_df)
return processed_df, NEW_LABEL_COLUMN_NAME
def _windowed_iqr(self, df):
tot_peaks = {}
for i in range(0, len(df) - self.window_size + 1, self.stride):
end_id = i + self.window_size
window = df[i:end_id]
ub, _ = self._iqr_rolling(window)
for j in window.index:
peaks_list = tot_peaks.setdefault(f'{j}', [])
peaks_list.append(window.loc[j] > ub)
final_peaks = {k: True if True in v else False
for k, v in tot_peaks.items()}
return tot_peaks, final_peaks, end_id
def _prepare_resulting_dataframe(self, peaks_df, news_or_cases_df):
final_df_iqr = pd.DataFrame.from_dict(peaks_df, orient='index')
dff = pd.DataFrame(news_or_cases_df)
dff['peaks'] = final_df_iqr.loc[:, 0].values
dff['peaks'] = dff['peaks'].map({True: 1, False: 0})
return dff
def _postprocess_anomalies(self, dataframe, col_name='news'):
dataframe['derivative'] = dataframe[col_name].diff().fillna(0)
dataframe['new_label'] = [0 if v.derivative < 0 and v.peaks == 1 else v.peaks
for _, v in dataframe.iterrows()]
return dataframe