import streamlit as st import pandas as pd from shapely.geometry import Point import folium import plotly.graph_objects as go from ErrorHandler import ErrorHandler class Visualization: def __init__(self, df): self.weekdays = ['Montag', 'Dienstag', 'Mittwoch', 'Donnerstag', 'Freitag', 'Samstag', 'Sonntag'] # time series metrics self.ts_metrics = {'Anomaliegrad':'anomaly', 'Anzahl Durchfahrten':'ride_id', 'Bremsungen':'breaks', 'Erschütterungsgrad':'logVScore', 'Geschwindigkeit':'speed', 'Normalisierte Geschwindigkeit':'norm_speed', 'Standzeit':'waiting', 'Zeitverlust':'time_loss', 'kumulierte Durchfahrten':'n_rides'} # metrics for hour and weekday plot self.hw_metrics = {'Anzahl Durchfahrten':'ride_id', 'Bremsungen':'breaks', 'Geschwindigkeit':'speed', 'Normalisierte Geschwindigkeit':'norm_speed', 'Standzeit':'waiting', 'Zeitverlust':'time_loss'} # self.df:pd.DataFrame = df self.min_rides = 10 if 'from_date' not in st.session_state: self.init_session_state(df) self.errorhandler = ErrorHandler() def init_session_state(self, df): """ Method for initialisation of the session state for the dashboard """ st.session_state['min_date'] = pd.to_datetime(df['date'].min()) st.session_state['from_date'] = pd.to_datetime(df['date'].min()) st.session_state['max_date'] = pd.to_datetime(df['date'].max()) st.session_state['to_date'] = pd.to_datetime(df['date'].max()) st.session_state['agg_level'] = 7 #days st.session_state['max_level'] = (st.session_state['to_date']-st.session_state['from_date']).days def set_date_input_clb(self,): st.session_state['from_date'] = max(st.session_state['min_date'], pd.to_datetime(st.session_state['from_date_input'])) st.session_state['to_date'] = min(st.session_state['max_date'], pd.to_datetime(st.session_state['to_date_input'])) st.session_state['max_level'] = (st.session_state['to_date']-st.session_state['from_date']).days if ('input_agg_level' in st.session_state) and (st.session_state['max_level'] > st.session_state['input_agg_level']): st.session_state['agg_level'] = st.session_state['input_agg_level'] else: st.session_state['agg_level'] = st.session_state['max_level'] def filter_component(self, st_obj=st): """ Filter Module Method :param st_obj: streamlit placing onject """ # buold filter container container = st_obj.container(border=True) cols = container.columns([5,2,1,1]) # interval filter self.selected_time = cols[0].radio('Zeitbereich:', ['Zeitinterval', 'Tageszeit', 'Wochentage'], horizontal=True) self.agg_level = 0 if self.selected_time == 'Zeitinterval': # date aggrgation level cols[1].number_input('Zeitinterval in Tagen', value=st.session_state['agg_level'], step=1, min_value=1, max_value=st.session_state['max_level'], key='input_agg_level', on_change=self.set_date_input_clb, help=self.helper_aggregation_level()) self.agg_level = st.session_state['agg_level'] # time range filter cols[2].date_input('Von', value = st.session_state['from_date'], min_value=st.session_state['min_date'], max_value=st.session_state['to_date']-pd.Timedelta(f"{self.agg_level}d"), format="DD.MM.YYYY", key='from_date_input', on_change=self.set_date_input_clb, help=self.helper_date_selection('Start')) cols[3].date_input('Bis', value = st.session_state['to_date'], min_value=st.session_state['from_date']+pd.Timedelta(f"{self.agg_level}d") , max_value=st.session_state['max_date'], format="DD.MM.YYYY", key='to_date_input', on_change=self.set_date_input_clb, help=self.helper_date_selection('End')) # metric filter metric_index = 0 if self.selected_time == 'Zeitinterval': metrics = self.ts_metrics self.time_column = 'date' metric_index = 3 elif self.selected_time == 'Wochentage': metrics = self.hw_metrics self.time_column = 'weekday' else: metrics = self.hw_metrics self.time_column = 'hour' # metric filter self.selected_metric = container.radio('Metriken:', metrics, horizontal=True, index=metric_index) self.selected_key = metrics[self.selected_metric] # controll user input self.errorhandler.time_input_validation(st.session_state['from_date'] , st.session_state['to_date'], self.agg_level) def aggregate_data(self): """ Method for filtering the data for visualization """ # select data in seleced time range data = self.df[self.df['date'].between(st.session_state['from_date'] , st.session_state['to_date'])].reset_index(drop=True) # aggregate data on time level if self.selected_time == 'Zeitinterval': data = data.groupby(['direction']) data = data.resample(f'{self.agg_level}d', on='date', origin=st.session_state['min_date']) elif self.selected_time == 'Tageszeit': data = data.groupby(['direction', 'hour']) else: data = data.groupby(['direction', 'weekday']) self.gdf_median = self.aggregate_single_dataframe(data) def fill_gaps_in_plotrange(self, df:pd.DataFrame): """ Method for aggregate data and fill missing data of the shapes :param df: dataframe with data :retrun df: aggregated dataframe """ # weekday, hour and date seection if self.selected_time == 'Wochentage': plot = pd.DataFrame(self.weekdays, columns=['weekday_names'], index=range(7)) plot['number'] = range(7) df = df.merge(plot, left_on='weekday', right_on='number', how='outer') elif self.selected_time == 'Tageszeit': plot = pd.DataFrame(range(24), columns=['number'], index=range(24)) df = df.merge(plot, left_on='hour', right_on='number', how='outer') else: date_series = pd.date_range(st.session_state['min_date'], st.session_state['max_date'], freq=f'{self.agg_level}d') plot = pd.DataFrame(date_series, columns=['number']) df = df.merge(plot, left_on='date', right_on='number', how='outer') # fill misssing data and sort values for right visualization df[self.time_column] = df['number'] df.sort_values('number', inplace=True, ignore_index=True) df['direction'] = df['direction'].ffill().bfill() df['n_rides'].ffill(inplace=True) df.fillna(0, inplace=True) if 'weekday' in df.columns: df['weekday'] = df['weekday'].apply(self.transform_weekdays) return df def aggregate_single_dataframe(self, gdf:pd.DataFrame): """ Method for aggration of a dataframe given function to time intervall :param gdf: dataframe with data of the shapes :return gdf: geodataframe with aggregated data """ # metric aggregation gdf = gdf.agg({'speed':'mean', 'norm_speed':'mean', 'breaks':'mean', 'logVScore':'mean', 'anomaly':'mean', 'ride_id':'sum', 'time_loss':'mean', 'waiting':'mean'}) gdf.reset_index(inplace=True) gdf.sort_values(by=self.time_column, ascending=True, inplace=True, ignore_index=True) gdf['n_rides'] = gdf['ride_id'].cumsum() return gdf def q25(self, x:pd.Series): return x.quantile(.25) def q75(self, x:pd.Series): return x.quantile(.75) def transform_weekdays(self, x): """ Method for transforming the value of the weekday into a string with the name :param x: value of the weekday :return: name of the weekday """ return self.weekdays[x] def create_map(self, gdf): """ Method for creating a folium map with choropleth layer of a geodataframe :param gdf: geodataframe with node geometries :return m: folium map object """ point = gdf.at[0,'geometry'].centroid m = folium.Map(location=[point.y, point.x], zoom_start=16, tiles='CartoDB Positron') folium.Choropleth(gdf.at[0,'geometry'], fill_color='blue', line_color='blue').add_to(m) folium.FitOverlays().add_to(m) return m def timeline_plot_plotly(self): """ method for creating a plotly time series plot for metric visualization :return fig: plotly figure """ if self.selected_time == 'Zeitinterval': width = pd.Timedelta(f'{self.agg_level/2}d') else: width = .5 # Erstelle eine leere Figur mit Plotly fig = go.Figure() directions = self.gdf_median['direction'].unique() colors = ['blue', 'green', 'red'] low_rides_dates = [] value_bounds = [[], []] for i, d in enumerate(directions): # select driving direction and fill missing data df = self.gdf_median[self.gdf_median['direction']==d].reset_index(drop=True) df = self.fill_gaps_in_plotrange(df) # tracking boundarie values for background plot idx = df[df['ride_id'] < self.min_rides].index if not idx.empty: low_rides_dates.extend(df.loc[idx, 'number'].to_list()) value_bounds[0].append(df[self.selected_key].min()) value_bounds[1].append(df[self.selected_key].max()) # plot direction based metric data fig.add_trace(go.Scatter(x=df[self.time_column], y=df[self.selected_key], line_shape='hvh', line=dict(color=colors[i]), name=f'Fahrtrichtung {d}')) # Hinzufügen von Bereichsfarben für Datenpunkte mit niedriger Anzahl von Fahrten if self.selected_metric != 'kumulierte Fahrten': min_value = min(value_bounds[0]) max_value = max(value_bounds[1]) for date in set(low_rides_dates): fig.add_shape(dict(type="rect", x0=date-width, y0=min_value-abs(.25*min_value), x1=date+width, y1=max_value+abs(.25*max_value), fillcolor="red", opacity=0.2, layer="below", line_width=0)) # Layout-Anpassungen fig.update_layout( xaxis_title=self.selected_time, yaxis_title=self.selected_metric, legend=dict( y=1.02, xanchor="right", x=1, traceorder="normal", orientation="h"), showlegend=True, xaxis_range=[st.session_state['from_date'], st.session_state['to_date']] ) return fig def int2str(self, x): return f"{int(x):_}".replace('_', '.') def float2str(self, x): return f"{float(x):_.2f}".replace('.', ',').replace('_', '.') def helper_plot_regions(self,): return 'Rot markierte Bereiche zeigen Zeitbereiche an, deren Fahrtenanzahl zu gering ist um \ eine gute Einschätzung der gewählten Metrik abzubilden.' def helper_aggregation_level(self,): return 'Zeitbereich für die Zusammenfassung von Fahrten in der Zeitinterval Darstellung.' def helper_date_selection(self, key): return f"{key}datum des dargestellten Zeitbereiches."