import random from datetime import datetime, timedelta, date, time import pandas as pd import numpy as np from typing import List, Iterator, Dict, Any, Optional def generate_random_data( date: date, start_time: time, end_time: time, count: int, response_time_range: (int, int), null_percentage: float ) -> pd.DataFrame: start_datetime: datetime = datetime.combine(date, start_time) end_datetime: datetime = datetime.combine(date, end_time) random_timestamps: List[datetime] = [ start_datetime + timedelta(seconds=random.randint(0, int((end_datetime - start_datetime).total_seconds()))) for _ in range(count) ] random_timestamps.sort() random_response_times: List[Optional[int]] = [ random.randint(response_time_range[0], response_time_range[1]) for _ in range(count) ] null_count: int = int(null_percentage * count) null_indices: List[int] = random.sample(range(count), null_count) for idx in null_indices: random_response_times[idx] = None data: Dict[str, Any] = { 'timestamp': random_timestamps, 'ResponseTime(ms)': random_response_times } df: pd.DataFrame = pd.DataFrame(data) return df def calculate_percentile( df: pd.DataFrame, freq: str, percentile: float ) -> pd.DataFrame: percentile_df: pd.DataFrame = df.groupby(pd.Grouper(key='timestamp', freq=freq))["ResponseTime(ms)"]\ .quantile(percentile).reset_index(name=f"p{int(percentile * 100)}_ResponseTime(ms)") percentile_df.replace(to_replace=np.nan, value=None, inplace=True) return percentile_df def aggregate_data( df: pd.DataFrame, period_length: str, ) -> pd.DataFrame: if df.empty: return pd.DataFrame() # Return an empty DataFrame if input is empty aggregation_funcs = { 'p50': lambda x: np.percentile(x.dropna(), 50) if not x.dropna().empty else np.nan, 'p95': lambda x: np.percentile(x.dropna(), 95) if not x.dropna().empty else np.nan, 'p99': lambda x: np.percentile(x.dropna(), 99) if not x.dropna().empty else np.nan, 'max': lambda x: np.max(x.dropna()) if not x.dropna().empty else np.nan, 'min': lambda x: np.min(x.dropna()) if not x.dropna().empty else np.nan, 'average': lambda x: np.mean(x.dropna()) if not x.dropna().empty else np.nan } summary_df = df.groupby(pd.Grouper(key='timestamp', freq=period_length)).agg( p50=('ResponseTime(ms)', aggregation_funcs['p50']), p95=('ResponseTime(ms)', aggregation_funcs['p95']), p99=('ResponseTime(ms)', aggregation_funcs['p99']), max=('ResponseTime(ms)', aggregation_funcs['max']), min=('ResponseTime(ms)', aggregation_funcs['min']), average=('ResponseTime(ms)', aggregation_funcs['average']), ).reset_index() return summary_df def re_aggregate_data( df: pd.DataFrame, period_length: str, ) -> pd.DataFrame: if df.empty: return pd.DataFrame() # Return an empty DataFrame if input is empty aggregation_funcs = { 'p50': lambda x: np.percentile(x.dropna(), 50) if not x.dropna().empty else np.nan, 'p95': lambda x: np.percentile(x.dropna(), 95) if not x.dropna().empty else np.nan, 'p99': lambda x: np.percentile(x.dropna(), 99) if not x.dropna().empty else np.nan, 'max': lambda x: np.max(x.dropna()) if not x.dropna().empty else np.nan, 'min': lambda x: np.min(x.dropna()) if not x.dropna().empty else np.nan, 'average': lambda x: np.mean(x.dropna()) if not x.dropna().empty else np.nan } summary_df = df.groupby(pd.Grouper(key='timestamp', freq=period_length)).agg( p50=('p50', aggregation_funcs['p50']), p95=('p95', aggregation_funcs['p95']), p99=('p99', aggregation_funcs['p99']), max=('max', aggregation_funcs['max']), min=('min', aggregation_funcs['min']), average=('average', aggregation_funcs['average']), ).reset_index() return summary_df def downsample(df, period_minutes): # Create a new datetime index at specified intervals freq_str = f'{period_minutes}T' new_index = pd.date_range(start=df['timestamp'].min(), end=df['timestamp'].max(), freq=freq_str) # Create an empty DataFrame with the new index df_downsampled = pd.DataFrame(index=new_index) # Set the original DataFrame's index to the timestamp column df.set_index('timestamp', inplace=True) # Interpolate the values for each column for column in df.columns: df_downsampled[column] = df[column].resample(freq_str).interpolate(method='linear') # Reset index to have timestamp as a column again df_downsampled.reset_index(inplace=True) df_downsampled.rename(columns={'index': 'timestamp'}, inplace=True) return df_downsampled def chunk_list(input_list: List[Any], size: int = 3) -> Iterator[List[Any]]: while input_list: chunk: List[Any] = input_list[:size] yield chunk input_list = input_list[size:] def evaluate_alarm_state( summary_df: pd.DataFrame, threshold: int, datapoints_to_alarm: int, evaluation_range: int, aggregation_function: str, alarm_condition: str ) -> pd.DataFrame: data_points: List[Optional[float]] = list(summary_df[aggregation_function].values) data_table_dict: Dict[str, List[Any]] = { "DataPoints": [], "# of data points that must be filled": [], "MISSING": [], "IGNORE": [], "BREACHING": [], "NOT BREACHING": [] } def check_condition(value, threshold, condition): if condition == '>': return value > threshold elif condition == '>=': return value >= threshold elif condition == '<': return value < threshold elif condition == '<=': return value <= threshold for chunk in chunk_list(input_list=data_points, size=evaluation_range): data_point_repr: str = '' num_dp_that_must_be_filled: int = 0 for dp in chunk: if str(dp).lower() == "nan": dp_symbol = '⚫️' elif check_condition(dp, threshold, alarm_condition): dp_symbol = '🔴' else: dp_symbol = '🟢' data_point_repr += dp_symbol if len(chunk) < evaluation_range: data_point_repr += '⚫️' * (evaluation_range - len(chunk)) if data_point_repr.count('⚫️') > (evaluation_range - datapoints_to_alarm): num_dp_that_must_be_filled = datapoints_to_alarm - sum([data_point_repr.count('🟢'), data_point_repr.count('🔴')]) data_table_dict["DataPoints"].append(data_point_repr) data_table_dict["# of data points that must be filled"].append(num_dp_that_must_be_filled) if num_dp_that_must_be_filled > 0: data_table_dict["MISSING"].append("INSUFFICIENT_DATA" if data_point_repr.count('⚫️') == evaluation_range else "Retain current state") data_table_dict["IGNORE"].append("Retain current state") data_table_dict["BREACHING"].append("ALARM") data_table_dict["NOT BREACHING"].append("OK") else: data_table_dict["MISSING"].append("OK") data_table_dict["IGNORE"].append("Retain current state") data_table_dict["BREACHING"].append("ALARM" if '🔴' * datapoints_to_alarm in data_point_repr else "OK") data_table_dict["NOT BREACHING"].append("ALARM" if '🟢' * datapoints_to_alarm not in data_point_repr else "OK") return pd.DataFrame(data_table_dict)