cloudwatch-simulator / concattedfiles.py
phitoduck's picture
almost working
755ac75
raw
history blame
15.7 kB
.
├── streamlit_app.py
└── utils.py
1 directory, 2 files
# File: ./streamlit_app.py
import streamlit as st
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, time, date
from typing import List, Dict, Any, Tuple
from utils import generate_random_data, calculate_percentile, evaluate_alarm_state, aggregate_data
# Constants
HARD_CODED_DATE = date(2024, 7, 26)
def main():
st.title("Streamlit App for Data Generation and Analysis")
# Initialize session state
initialize_session_state()
# Section 1 - Generate random data
st.header("Section 1 - Generate Random Data")
generate_data_form()
if not st.session_state.df.empty:
display_dataframe("Raw Event Data", st.session_state.df)
# Section 2 - Calculate Percentile
st.header("Section 2 - Calculate Percentile")
percentile_form()
if not st.session_state.percentile_df.empty:
display_dataframe("Aggregated Summary Data", st.session_state.percentile_df)
# Section 3 - Summary Data Aggregated by Period
st.header("Section 3 - Summary Data Aggregated by Period")
summary_by_period_form()
if not st.session_state.summary_by_period_df.empty:
display_dataframe("Summary Data Aggregated by Period", st.session_state.summary_by_period_df)
# Section 4 - Evaluate Alarm State
st.header("Section 4 - Evaluate Alarm State")
alarm_state_form()
if not st.session_state.alarm_state_df.empty:
plot_time_series(st.session_state.summary_by_period_df, st.session_state.threshold_input, st.session_state.alarm_condition_input, st.session_state.evaluation_range_input)
display_alarm_state_evaluation(st.session_state.alarm_state_df)
display_key_tables()
def initialize_session_state() -> None:
if 'df' not in st.session_state:
st.session_state.df = pd.DataFrame()
if 'percentile_df' not in st.session_state:
st.session_state.percentile_df = pd.DataFrame()
if 'summary_by_period_df' not in st.session_state:
st.session_state.summary_by_period_df = pd.DataFrame()
if 'alarm_state_df' not in st.session_state:
st.session_state.alarm_state_df = pd.DataFrame()
def generate_data_form() -> None:
with st.form(key='generate_data_form'):
start_time_input = st.time_input("Start Time", time(12, 0), help="Select the start time for generating random data.")
end_time_input = st.time_input("End Time", time(12, 30), help="Select the end time for generating random data.")
count_input = st.slider("Count", min_value=1, max_value=200, value=60, help="Specify the number of data points to generate.")
response_time_range_input = st.slider("Response Time Range (ms)", min_value=50, max_value=300, value=(100, 250), help="Select the range of response times in milliseconds.")
null_percentage_input = st.slider("Null Percentage", min_value=0.0, max_value=1.0, value=0.5, help="Select the percentage of null values in the generated data.")
submit_button = st.form_submit_button(label='Generate Data')
if submit_button:
st.session_state.df = generate_random_data(
date=HARD_CODED_DATE,
start_time=start_time_input,
end_time=end_time_input,
count=count_input,
response_time_range=response_time_range_input,
null_percentage=null_percentage_input
)
def percentile_form() -> None:
freq_input = st.selectbox("Period (bin)", ['1min', '5min', '15min'], key='freq_input', help="Select the frequency for aggregating the data.")
percentile_input = st.slider("Percentile", min_value=0.0, max_value=1.0, value=0.95, key='percentile_input', help="Select the percentile for calculating the aggregated summary data.")
if not st.session_state.df.empty:
st.session_state.percentile_df = calculate_percentile(st.session_state.df, freq_input, percentile_input)
def summary_by_period_form() -> None:
period_length_input = st.selectbox("Period Length", ['1min', '5min', '15min'], key='period_length_input', help="Select the period length for aggregating the summary data.")
if not st.session_state.df.empty:
st.session_state.summary_by_period_df = aggregate_data(st.session_state.df, period_length_input)
def alarm_state_form() -> None:
threshold_input = st.number_input("Threshold (ms)", min_value=50, max_value=300, value=150, key='threshold_input', help="Specify the threshold value for evaluating the alarm state.")
datapoints_to_alarm_input = st.number_input("Datapoints to Alarm", min_value=1, value=3, key='datapoints_to_alarm_input', help="Specify the number of data points required to trigger an alarm.")
evaluation_range_input = st.number_input("Evaluation Range", min_value=1, value=5, key='evaluation_range_input', help="Specify the range of data points to evaluate for alarm state.")
aggregation_function_input = st.selectbox(
"Aggregation Function",
['p50', 'p95', 'p99', 'max', 'min', 'average'],
key='aggregation_function_input',
help="Select the aggregation function for visualizing the data and computing alarms."
)
alarm_condition_input = st.selectbox(
"Alarm Condition",
['>', '>=', '<', '<='],
key='alarm_condition_input',
help="Select the condition for evaluating the alarm state."
)
if not st.session_state.summary_by_period_df.empty:
st.session_state.alarm_state_df = evaluate_alarm_state(
summary_df=st.session_state.summary_by_period_df,
threshold=threshold_input,
datapoints_to_alarm=datapoints_to_alarm_input,
evaluation_range=evaluation_range_input,
aggregation_function=aggregation_function_input,
alarm_condition=alarm_condition_input
)
def display_dataframe(title: str, df: pd.DataFrame) -> None:
st.write(title)
st.dataframe(df)
def plot_time_series(df: pd.DataFrame, threshold: int, alarm_condition: str, evaluation_range: int) -> None:
timestamps = df['Timestamp']
response_times = df[st.session_state.aggregation_function_input]
segments = []
current_segment = {'timestamps': [], 'values': []}
for timestamp, value in zip(timestamps, response_times):
if pd.isna(value):
if current_segment['timestamps']:
segments.append(current_segment)
current_segment = {'timestamps': [], 'values': []}
else:
current_segment['timestamps'].append(timestamp)
current_segment['values'].append(value)
if current_segment['timestamps']:
segments.append(current_segment)
fig, ax1 = plt.subplots()
color = 'tab:blue'
ax1.set_xlabel('Timestamp')
ax1.set_ylabel('Response Time (ms)', color=color)
for segment in segments:
ax1.plot(segment['timestamps'], segment['values'], color=color, linewidth=0.5)
ax1.scatter(segment['timestamps'], segment['values'], color=color, s=10)
line_style = '--' if alarm_condition in ['<', '>'] else '-'
ax1.axhline(y=threshold, color='r', linestyle=line_style, linewidth=0.8, label='Threshold')
ax1.tick_params(axis='y', labelcolor=color)
if alarm_condition in ['<=', '<']:
ax1.fill_between(timestamps, 0, threshold, color='pink', alpha=0.3)
else:
ax1.fill_between(timestamps, threshold, response_times.max(), color='pink', alpha=0.3)
period_indices = range(len(df))
ax2 = ax1.twiny()
ax2.set_xticks(period_indices)
ax2.set_xticklabels(period_indices, fontsize=8)
ax2.set_xlabel('Time Periods', fontsize=8)
ax2.xaxis.set_tick_params(width=0.5)
for idx in period_indices:
if idx % evaluation_range == 0:
ax1.axvline(x=df['Timestamp'].iloc[idx], color='green', linestyle='-', alpha=0.3)
max_value = max(filter(lambda x: x is not None, df[st.session_state.aggregation_function_input]))
ax1.text(df['Timestamp'].iloc[idx], max_value * 0.95, f"[{idx // evaluation_range}]", rotation=90, verticalalignment='bottom', color='grey', alpha=0.7, fontsize=8)
else:
ax1.axvline(x=df['Timestamp'].iloc[idx], color='grey', linestyle='--', alpha=0.3)
ax1.annotate('Alarm threshold', xy=(0.98, threshold), xycoords=('axes fraction', 'data'), ha='right', va='bottom', fontsize=8, color='red', backgroundcolor='none')
fig.tight_layout()
st.pyplot(fig)
def display_alarm_state_evaluation(df: pd.DataFrame) -> None:
st.write("Alarm State Evaluation")
st.dataframe(df)
def display_key_tables() -> None:
st.write("### Key")
# Symbols
st.write("#### Symbols")
symbol_data = {
"Symbol": ["X", "-", "0"],
"Meaning": [
"Breaching data point: This data point exceeds the threshold.",
"Missing data point: This data point is missing or not reported.",
"Non-breaching data point: This data point is within the threshold."
]
}
symbol_df = pd.DataFrame(symbol_data)
st.table(symbol_df)
# Columns
st.write("#### Columns")
column_data = {
"Column": ["MISSING", "IGNORE", "BREACHING", "NOT BREACHING"],
"Meaning": [
"Action to take when all data points are missing. Possible values: INSUFFICIENT_DATA, Retain current state, ALARM, OK.",
"Action to take when data points are missing but ignored. Possible values: Retain current state, ALARM, OK.",
"Action to take when missing data points are treated as breaching. Possible values: ALARM, OK.",
"Action to take when missing data points are treated as not breaching. Possible values: ALARM, OK."
]
}
column_df = pd.DataFrame(column_data)
st.table(column_df)
# States
st.write("#### States")
state_data = {
"State": ["ALARM", "OK", "Retain current state", "INSUFFICIENT_DATA"],
"Description": [
"Alarm state is triggered.",
"Everything is within the threshold.",
"The current alarm state is maintained.",
"Not enough data to make a determination."
]
}
state_df = pd.DataFrame(state_data)
st.table(state_df)
if __name__ == "__main__":
main()
# File: ./utils.py
import random
from datetime import datetime, timedelta, date, time
import pandas as pd
import numpy as np
from typing import List, Iterator, Dict, Any, Optional
def generate_random_data(
date: date,
start_time: time,
end_time: time,
count: int,
response_time_range: (int, int),
null_percentage: float
) -> pd.DataFrame:
start_datetime: datetime = datetime.combine(date, start_time)
end_datetime: datetime = datetime.combine(date, end_time)
random_timestamps: List[datetime] = [
start_datetime + timedelta(seconds=random.randint(0, int((end_datetime - start_datetime).total_seconds())))
for _ in range(count)
]
random_timestamps.sort()
random_response_times: List[Optional[int]] = [
random.randint(response_time_range[0], response_time_range[1]) for _ in range(count)
]
null_count: int = int(null_percentage * count)
null_indices: List[int] = random.sample(range(count), null_count)
for idx in null_indices:
random_response_times[idx] = None
data: Dict[str, Any] = {
'Timestamp': random_timestamps,
'ResponseTime(ms)': random_response_times
}
df: pd.DataFrame = pd.DataFrame(data)
return df
def calculate_percentile(
df: pd.DataFrame,
freq: str,
percentile: float
) -> pd.DataFrame:
percentile_df: pd.DataFrame = df.groupby(pd.Grouper(key='Timestamp', freq=freq))["ResponseTime(ms)"]\
.quantile(percentile).reset_index(name=f"p{int(percentile * 100)}_ResponseTime(ms)")
percentile_df.replace(to_replace=np.nan, value=None, inplace=True)
return percentile_df
def aggregate_data(
df: pd.DataFrame,
period_length: str
) -> pd.DataFrame:
aggregation_funcs = {
'p50': lambda x: np.percentile(x.dropna(), 50),
'p95': lambda x: np.percentile(x.dropna(), 95),
'p99': lambda x: np.percentile(x.dropna(), 99),
'max': lambda x: np.max(x.dropna()),
'min': lambda x: np.min(x.dropna()),
'average': lambda x: np.mean(x.dropna())
}
summary_df = df.groupby(pd.Grouper(key='Timestamp', freq=period_length)).agg(
p50=('ResponseTime(ms)', aggregation_funcs['p50']),
p95=('ResponseTime(ms)', aggregation_funcs['p95']),
p99=('ResponseTime(ms)', aggregation_funcs['p99']),
max=('ResponseTime(ms)', aggregation_funcs['max']),
min=('ResponseTime(ms)', aggregation_funcs['min']),
average=('ResponseTime(ms)', aggregation_funcs['average']),
).reset_index()
return summary_df
def chunk_list(input_list: List[Any], size: int = 3) -> Iterator[List[Any]]:
while input_list:
chunk: List[Any] = input_list[:size]
yield chunk
input_list = input_list[size:]
def evaluate_alarm_state(
summary_df: pd.DataFrame,
threshold: int,
datapoints_to_alarm: int,
evaluation_range: int,
aggregation_function: str,
alarm_condition: str
) -> pd.DataFrame:
data_points: List[Optional[float]] = list(summary_df[aggregation_function].values)
data_table_dict: Dict[str, List[Any]] = {
"DataPoints": [],
"# of data points that must be filled": [],
"MISSING": [],
"IGNORE": [],
"BREACHING": [],
"NOT BREACHING": []
}
def check_condition(value, threshold, condition):
if condition == '>':
return value > threshold
elif condition == '>=':
return value >= threshold
elif condition == '<':
return value < threshold
elif condition == '<=':
return value <= threshold
for chunk in chunk_list(input_list=data_points, size=evaluation_range):
data_point_repr: str = ''
num_dp_that_must_be_filled: int = 0
for dp in chunk:
if dp is None:
data_point_repr += '-'
elif check_condition(dp, threshold, alarm_condition):
data_point_repr += 'X'
else:
data_point_repr += '0'
if len(chunk) < evaluation_range:
data_point_repr += '-' * (evaluation_range - len(chunk))
if data_point_repr.count('-') > (evaluation_range - datapoints_to_alarm):
num_dp_that_must_be_filled = datapoints_to_alarm - sum([data_point_repr.count('0'), data_point_repr.count('X')])
data_table_dict["DataPoints"].append(data_point_repr)
data_table_dict["# of data points that must be filled"].append(num_dp_that_must_be_filled)
if num_dp_that_must_be_filled > 0:
data_table_dict["MISSING"].append("INSUFFICIENT_DATA" if data_point_repr.count('-') == evaluation_range else "Retain current state")
data_table_dict["IGNORE"].append("Retain current state")
data_table_dict["BREACHING"].append("ALARM")
data_table_dict["NOT BREACHING"].append("OK")
else:
data_table_dict["MISSING"].append("OK")
data_table_dict["IGNORE"].append("Retain current state")
data_table_dict["BREACHING"].append("ALARM" if 'X' * datapoints_to_alarm in data_point_repr else "OK")
data_table_dict["NOT BREACHING"].append("ALARM" if '0' * datapoints_to_alarm not in data_point_repr else "OK")
return pd.DataFrame(data_table_dict)