File size: 1,939 Bytes
3d3d712 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
import pandas as pd
from pandas.api.types import is_numeric_dtype
from taskweaver.plugin import Plugin, register_plugin
@register_plugin
class AnomalyDetectionPlugin(Plugin):
def __call__(self, df: pd.DataFrame, time_col_name: str, value_col_name: str):
"""
anomaly_detection function identifies anomalies from an input dataframe of time series.
It will add a new column "Is_Anomaly", where each entry will be marked with "True" if the value is an anomaly
or "False" otherwise.
:param df: the input data, must be a dataframe
:param time_col_name: name of the column that contains the datetime
:param value_col_name: name of the column that contains the numeric values.
:return df: a new df that adds an additional "Is_Anomaly" column based on the input df.
:return description: the description about the anomaly detection results.
"""
try:
df[time_col_name] = pd.to_datetime(df[time_col_name])
except Exception:
print("Time column is not datetime")
return
if not is_numeric_dtype(df[value_col_name]):
try:
df[value_col_name] = df[value_col_name].astype(float)
except ValueError:
print("Value column is not numeric")
return
mean, std = df[value_col_name].mean(), df[value_col_name].std()
cutoff = std * 3
lower, upper = mean - cutoff, mean + cutoff
df["Is_Anomaly"] = df[value_col_name].apply(lambda x: x < lower or x > upper)
anomaly_count = df["Is_Anomaly"].sum()
description = "There are {} anomalies in the time series data".format(anomaly_count)
self.ctx.add_artifact(
name="anomaly_detection_results",
file_name="anomaly_detection_results.csv",
type="df",
val=df,
)
return df, description
|