ChristophS's picture
test input
a4279d4
import streamlit as st
import pandas as pd
from shapely.geometry import Point
import folium
import plotly.graph_objects as go
from ErrorHandler import ErrorHandler
class Visualization:
def __init__(self, df):
self.weekdays = ['Montag', 'Dienstag', 'Mittwoch', 'Donnerstag', 'Freitag', 'Samstag', 'Sonntag']
# time series metrics
self.ts_metrics = {'Anomaliegrad':'anomaly', 'Anzahl Durchfahrten':'ride_id', 'Bremsungen':'breaks',
'Erschütterungsgrad':'logVScore', 'Geschwindigkeit':'speed', 'Normalisierte Geschwindigkeit':'norm_speed',
'Standzeit':'waiting', 'Zeitverlust':'time_loss', 'kumulierte Durchfahrten':'n_rides'}
# metrics for hour and weekday plot
self.hw_metrics = {'Anzahl Durchfahrten':'ride_id', 'Bremsungen':'breaks', 'Geschwindigkeit':'speed',
'Normalisierte Geschwindigkeit':'norm_speed', 'Standzeit':'waiting', 'Zeitverlust':'time_loss'}
#
self.df:pd.DataFrame = df
self.min_rides = 10
if 'from_date' not in st.session_state:
self.init_session_state(df)
self.errorhandler = ErrorHandler()
def init_session_state(self, df):
"""
Method for initialisation of the session state for the dashboard
"""
st.session_state['min_date'] = pd.to_datetime(df['date'].min())
st.session_state['from_date'] = pd.to_datetime(df['date'].min())
st.session_state['max_date'] = pd.to_datetime(df['date'].max())
st.session_state['to_date'] = pd.to_datetime(df['date'].max())
st.session_state['agg_level'] = 7 #days
st.session_state['max_level'] = (st.session_state['to_date']-st.session_state['from_date']).days
def set_date_input_clb(self,):
st.session_state['from_date'] = max(st.session_state['min_date'], pd.to_datetime(st.session_state['from_date_input']))
st.session_state['to_date'] = min(st.session_state['max_date'], pd.to_datetime(st.session_state['to_date_input']))
st.session_state['max_level'] = (st.session_state['to_date']-st.session_state['from_date']).days
if ('input_agg_level' in st.session_state) and (st.session_state['max_level'] > st.session_state['input_agg_level']):
st.session_state['agg_level'] = st.session_state['input_agg_level']
else:
st.session_state['agg_level'] = st.session_state['max_level']
def filter_component(self, st_obj=st):
"""
Filter Module Method
:param st_obj: streamlit placing onject
"""
# buold filter container
container = st_obj.container(border=True)
cols = container.columns([5,2,1,1])
# interval filter
self.selected_time = cols[0].radio('Zeitbereich:', ['Zeitinterval', 'Tageszeit', 'Wochentage'], horizontal=True)
self.agg_level = 0
if self.selected_time == 'Zeitinterval':
# date aggrgation level
cols[1].number_input('Zeitinterval in Tagen',
value=st.session_state['agg_level'], step=1,
min_value=1, max_value=st.session_state['max_level'],
key='input_agg_level', on_change=self.set_date_input_clb,
help=self.helper_aggregation_level())
self.agg_level = st.session_state['agg_level']
# time range filter
cols[2].date_input('Von', value = st.session_state['from_date'],
min_value=st.session_state['min_date'],
max_value=st.session_state['to_date']-pd.Timedelta(f"{self.agg_level}d"),
format="DD.MM.YYYY", key='from_date_input', on_change=self.set_date_input_clb,
help=self.helper_date_selection('Start'))
cols[3].date_input('Bis', value = st.session_state['to_date'],
min_value=st.session_state['from_date']+pd.Timedelta(f"{self.agg_level}d") ,
max_value=st.session_state['max_date'],
format="DD.MM.YYYY", key='to_date_input', on_change=self.set_date_input_clb,
help=self.helper_date_selection('End'))
# metric filter
metric_index = 0
if self.selected_time == 'Zeitinterval':
metrics = self.ts_metrics
self.time_column = 'date'
metric_index = 3
elif self.selected_time == 'Wochentage':
metrics = self.hw_metrics
self.time_column = 'weekday'
else:
metrics = self.hw_metrics
self.time_column = 'hour'
# metric filter
self.selected_metric = container.radio('Metriken:', metrics, horizontal=True, index=metric_index)
self.selected_key = metrics[self.selected_metric]
# controll user input
self.errorhandler.time_input_validation(st.session_state['from_date'] , st.session_state['to_date'], self.agg_level)
def aggregate_data(self):
"""
Method for filtering the data for visualization
"""
# select data in seleced time range
data = self.df[self.df['date'].between(st.session_state['from_date'] , st.session_state['to_date'])].reset_index(drop=True)
# aggregate data on time level
if self.selected_time == 'Zeitinterval':
data = data.groupby(['direction'])
data = data.resample(f'{self.agg_level}d', on='date', origin=st.session_state['min_date'])
elif self.selected_time == 'Tageszeit':
data = data.groupby(['direction', 'hour'])
else:
data = data.groupby(['direction', 'weekday'])
self.gdf_median = self.aggregate_single_dataframe(data)
def fill_gaps_in_plotrange(self, df:pd.DataFrame):
"""
Method for aggregate data and fill missing data of the shapes
:param df: dataframe with data
:retrun df: aggregated dataframe
"""
# weekday, hour and date seection
if self.selected_time == 'Wochentage':
plot = pd.DataFrame(self.weekdays, columns=['weekday_names'], index=range(7))
plot['number'] = range(7)
df = df.merge(plot, left_on='weekday', right_on='number', how='outer')
elif self.selected_time == 'Tageszeit':
plot = pd.DataFrame(range(24), columns=['number'], index=range(24))
df = df.merge(plot, left_on='hour', right_on='number', how='outer')
else:
date_series = pd.date_range(st.session_state['min_date'], st.session_state['max_date'], freq=f'{self.agg_level}d')
plot = pd.DataFrame(date_series, columns=['number'])
df = df.merge(plot, left_on='date', right_on='number', how='outer')
# fill misssing data and sort values for right visualization
df[self.time_column] = df['number']
df.sort_values('number', inplace=True, ignore_index=True)
df['direction'] = df['direction'].ffill().bfill()
df['n_rides'].ffill(inplace=True)
df.fillna(0, inplace=True)
if 'weekday' in df.columns:
df['weekday'] = df['weekday'].apply(self.transform_weekdays)
return df
def aggregate_single_dataframe(self, gdf:pd.DataFrame):
"""
Method for aggration of a dataframe given function to time intervall
:param gdf: dataframe with data of the shapes
:return gdf: geodataframe with aggregated data
"""
# metric aggregation
gdf = gdf.agg({'speed':'mean', 'norm_speed':'mean', 'breaks':'mean', 'logVScore':'mean',
'anomaly':'mean', 'ride_id':'sum', 'time_loss':'mean', 'waiting':'mean'})
gdf.reset_index(inplace=True)
gdf.sort_values(by=self.time_column, ascending=True, inplace=True, ignore_index=True)
gdf['n_rides'] = gdf['ride_id'].cumsum()
return gdf
def q25(self, x:pd.Series):
return x.quantile(.25)
def q75(self, x:pd.Series):
return x.quantile(.75)
def transform_weekdays(self, x):
"""
Method for transforming the value of the weekday into a string with the name
:param x: value of the weekday
:return: name of the weekday
"""
return self.weekdays[x]
def create_map(self, gdf):
"""
Method for creating a folium map with choropleth layer of a geodataframe
:param gdf: geodataframe with node geometries
:return m: folium map object
"""
point = gdf.at[0,'geometry'].centroid
m = folium.Map(location=[point.y, point.x], zoom_start=16, tiles='CartoDB Positron')
folium.Choropleth(gdf.at[0,'geometry'], fill_color='blue', line_color='blue').add_to(m)
folium.FitOverlays().add_to(m)
return m
def timeline_plot_plotly(self):
"""
method for creating a plotly time series plot for metric visualization
:return fig: plotly figure
"""
if self.selected_time == 'Zeitinterval':
width = pd.Timedelta(f'{self.agg_level/2}d')
else:
width = .5
# Erstelle eine leere Figur mit Plotly
fig = go.Figure()
directions = self.gdf_median['direction'].unique()
colors = ['blue', 'green', 'red']
low_rides_dates = []
value_bounds = [[], []]
for i, d in enumerate(directions):
# select driving direction and fill missing data
df = self.gdf_median[self.gdf_median['direction']==d].reset_index(drop=True)
df = self.fill_gaps_in_plotrange(df)
# tracking boundarie values for background plot
idx = df[df['ride_id'] < self.min_rides].index
if not idx.empty:
low_rides_dates.extend(df.loc[idx, 'number'].to_list())
value_bounds[0].append(df[self.selected_key].min())
value_bounds[1].append(df[self.selected_key].max())
# plot direction based metric data
fig.add_trace(go.Scatter(x=df[self.time_column], y=df[self.selected_key],
line_shape='hvh', line=dict(color=colors[i]),
name=f'Fahrtrichtung {d}'))
# Hinzufügen von Bereichsfarben für Datenpunkte mit niedriger Anzahl von Fahrten
if self.selected_metric != 'kumulierte Fahrten':
min_value = min(value_bounds[0])
max_value = max(value_bounds[1])
for date in set(low_rides_dates):
fig.add_shape(dict(type="rect",
x0=date-width, y0=min_value-abs(.25*min_value),
x1=date+width, y1=max_value+abs(.25*max_value),
fillcolor="red", opacity=0.2,
layer="below", line_width=0))
# Layout-Anpassungen
fig.update_layout(
xaxis_title=self.selected_time,
yaxis_title=self.selected_metric,
legend=dict( y=1.02, xanchor="right", x=1, traceorder="normal", orientation="h"),
showlegend=True,
xaxis_range=[st.session_state['from_date'], st.session_state['to_date']]
)
return fig
def int2str(self, x):
return f"{int(x):_}".replace('_', '.')
def float2str(self, x):
return f"{float(x):_.2f}".replace('.', ',').replace('_', '.')
def helper_plot_regions(self,):
return 'Rot markierte Bereiche zeigen Zeitbereiche an, deren Fahrtenanzahl zu gering ist um \
eine gute Einschätzung der gewählten Metrik abzubilden.'
def helper_aggregation_level(self,):
return 'Zeitbereich für die Zusammenfassung von Fahrten in der Zeitinterval Darstellung.'
def helper_date_selection(self, key):
return f"{key}datum des dargestellten Zeitbereiches."