|
import folium |
|
import matplotlib.pyplot as plt |
|
import streamlit as st |
|
import numpy as np |
|
import pandas as pd |
|
import matplotlib as mpl |
|
import bezier |
|
|
|
class Visualization: |
|
""" |
|
Class for handling all visualization of the streamlit dashboard |
|
""" |
|
def __init__(self,): |
|
self.cmap = mpl.colors.ListedColormap(['green', 'lightgreen', 'yellow', 'orange','red']) |
|
self.cmap.set_over('darkred') |
|
self.bounds = [0,10, 20, 30, 45, 60] |
|
self.final_result = ['A', 'B', 'C', 'D', 'E', 'F'] |
|
self.norm = mpl.colors.BoundaryNorm(self.bounds, self.cmap.N) |
|
self.s_m = mpl.cm.ScalarMappable(cmap=self.cmap, norm=self.norm,) |
|
self.min_ride_percentage = 0.05 |
|
if 'node_id' not in st.session_state: |
|
st.session_state['node_id'] = 31334646 |
|
st.session_state['extension_radius'] = 20 |
|
|
|
def get_final_result(self, timeloss): |
|
""" |
|
Function for calculating the time loss class in a range from |
|
'A' very good to 'F' very bad |
|
:param timeloss: worst relevant timeloss of node driving directions |
|
:return: timeloss class |
|
""" |
|
final = -1 |
|
for idx in range(1, len(self.bounds)): |
|
if self.bounds[idx-1] < timeloss <=self.bounds[idx]: |
|
return self.final_result[idx-1] |
|
|
|
return self.final_result[final] |
|
|
|
def create_folium_map(self, gdf): |
|
""" |
|
Method for creating a folium map with choropleth layer of a geodataframe |
|
:param gdf: geodataframe with node geometries |
|
:return m: folium map object |
|
""" |
|
point = gdf.at[0,'geometry'].centroid |
|
m = folium.Map(location=[point.y, point.x], zoom_start=18) |
|
folium.Choropleth(gdf.at[0,'geometry'], fill_color='blue', line_color='blue').add_to(m) |
|
folium.Choropleth(gdf.at[1,'geometry'], fill_color='blue', line_color='blue', fill_opacity=.3).add_to(m) |
|
return m |
|
|
|
@st.cache_data |
|
def get_diretion_results(_self, directs, feature): |
|
""" |
|
Method for creating a list of the directions plots and direction data in a dataframe |
|
:param directs: in/out directions of the node |
|
:param feature: time data feature of the node |
|
:return figures: list with figure objects |
|
:return feature: list with dataframes of time feature |
|
""" |
|
figures, results =[], [] |
|
|
|
for dir in directs: |
|
|
|
fig_loss, result = _self.direction_plot(feature, directs, dir) |
|
figures.append(fig_loss) |
|
results.append(result) |
|
min_loss = _self.get_min_relavent_direction_loss(results) |
|
return figures, results, min_loss |
|
|
|
def get_min_relavent_direction_loss(self, results): |
|
""" |
|
Method for calculating the timeloss of the worst relevant node path |
|
:param results: dataframe with possible driving paths and timelosses |
|
:return: timeloss |
|
""" |
|
total_rides = sum([df[df['Richtung']=='Gesamt']['Fahrten'].sum() for df in results]) |
|
min_rides = self.min_ride_percentage*total_rides |
|
loss_class = [] |
|
for idx, df in enumerate(results): |
|
tmp = df[~(df['Richtung']=='Gesamt') & (df['Fahrten']>=min_rides)].reset_index() |
|
if tmp.empty: |
|
continue |
|
tmp['InDirection'] = idx |
|
loss_class.append(tmp) |
|
loss_class = pd.concat(loss_class, ignore_index=True) |
|
idx = loss_class['Zeitverlust'].argmax() |
|
return loss_class.loc[idx, ['InDirection', 'Zeitverlust', 'Richtung']] |
|
|
|
def direction_plot(self, data, directs, current_key): |
|
""" |
|
Method for generating the time direction plot for the current input direction |
|
:param data: time quality feature for plotting |
|
:param directs: travel directions for input output tracking |
|
:param current_key: key of directs for plotting |
|
:return fig_loss: matplotlib figure object with direction plot informations |
|
:param df_result: dataframe with aggregated information of the time quality feature of the travel direction |
|
:param df_time: datafrme with the aggregated rushhour feature of the travel direction |
|
""" |
|
|
|
df_result = pd.DataFrame(index=range(len(directs)+1),columns=['Richtung', 'Standzeit', 'Zeitverlust', 'Ratio', 'Fahrten']) |
|
|
|
fig_loss, axs_loss = plt.subplots(1,3, figsize=(8,4), gridspec_kw={'width_ratios':[1, 1, 0.15]}) |
|
|
|
angle_in = directs[current_key]['value'] |
|
origin = (180+angle_in)%360 |
|
rad_origin = np.deg2rad(origin) |
|
x_origin, y_origin = np.cos(rad_origin), np.sin(rad_origin) |
|
|
|
total_loss = 0 |
|
total_rides = 0 |
|
total_ratio = 0 |
|
total_waiting = 0 |
|
|
|
for i, dkey_out in enumerate(directs): |
|
angle_diff = abs(directs[current_key]['value'] - directs[dkey_out]['value']) |
|
|
|
if (160<angle_diff<200) or (160<(360+angle_diff)%360 <200): |
|
continue |
|
|
|
angle_out = directs[dkey_out]['value'] |
|
rad_out = np.deg2rad(angle_out) |
|
|
|
timeloss = data['TimeLoss'].at[int(current_key), int(dkey_out)] |
|
rides = data['RideCount'].at[int(current_key), int(dkey_out)] |
|
waiting = data['StandingTime'].at[int(current_key), int(dkey_out)] |
|
ratio = waiting/(timeloss+1e-7) |
|
|
|
df_result.at[i, 'Richtung'] = f"{directs[dkey_out]['name']}" |
|
df_result.at[i, 'Zeitverlust'] = np.round(timeloss,2) |
|
df_result.at[i, 'Standzeit'] = np.round(waiting,2) |
|
df_result.at[i, 'Fahrten'] = rides |
|
df_result.at[i, 'Ratio'] = np.round(ratio,2) |
|
total_loss += rides*timeloss |
|
total_rides += rides |
|
total_ratio += rides*ratio |
|
total_waiting += rides*waiting |
|
|
|
lw = np.min([7, rides/10]) |
|
lw = np.max([lw, .5]) |
|
|
|
x_out = np.cos(rad_out) |
|
y_out = np.sin(rad_out) |
|
x_bezier, y_bezier = self.get_bezier_curve_in_polar([x_origin, y_origin],[x_out, y_out]) |
|
|
|
x = x_out-np.sign(x_out)*.1 |
|
y = y_out-np.sign(y_out)*.1 |
|
|
|
|
|
if rides > 0: |
|
color_loss = self.s_m.to_rgba(float(timeloss)) |
|
color_waiting = self.s_m.to_rgba(float(waiting)) |
|
else: |
|
color_loss = 'gray' |
|
color_waiting = 'gray' |
|
im = axs_loss[0].plot(x_bezier,y_bezier, color=color_loss, lw=lw) |
|
|
|
im = axs_loss[1].plot(x_bezier,y_bezier, color=color_waiting, lw=lw) |
|
|
|
axs_loss[0].text(x,y, f"{directs[dkey_out]['name']}") |
|
axs_loss[1].text(x,y, f"{directs[dkey_out]['name']}") |
|
|
|
cb1 = mpl.colorbar.ColorbarBase(axs_loss[2], |
|
cmap=self.cmap, |
|
norm=self.norm, |
|
extend='max', |
|
) |
|
cb1.set_label('Zeitverlustklassen [s]', loc='center') |
|
|
|
|
|
for a in range(2): |
|
|
|
axs_loss[a].plot(*self.get_unit_circle_data(), color='gray', lw=3, alpha=0.4) |
|
|
|
axs_loss[a].arrow(x_origin, y_origin, -.6*x_origin, -.6*y_origin, width=.07, color='k', alpha=.3) |
|
|
|
axs_loss[a].set_ylim([-1.21,1.01]) |
|
axs_loss[a].set_xlim([-1.01,1.01]) |
|
axs_loss[a].axis('off') |
|
|
|
axs_loss[0].set_title(f'Zeitverlust') |
|
axs_loss[1].set_title(f'Standzeit') |
|
|
|
df_result.dropna(inplace=True) |
|
|
|
df_result.at[i+1, 'Richtung'] = f'Gesamt' |
|
df_result.at[i+1, 'Fahrten'] = total_rides |
|
if total_rides >0: |
|
df_result.at[i+1, 'Zeitverlust'] = np.round(total_loss/total_rides, 2) |
|
df_result.at[i+1, 'Standzeit'] = np.round(total_waiting/total_rides, 2) |
|
df_result.at[i+1, 'Ratio'] = np.round(total_ratio/total_rides, 2) |
|
else: |
|
df_result.fillna(0, inplace=True) |
|
|
|
return fig_loss, df_result |
|
|
|
def get_bezier_curve_in_polar(self, p0, p1, n=15): |
|
""" |
|
Method for calculation a bezier curve given two points with the intersection point (0,0) |
|
:param p0: point coordination of the staring point |
|
:param p1: point coordinates of the end point |
|
:param n: number of points for the line |
|
""" |
|
|
|
norm0 = 1/np.sum(np.array(p0)**2)**.5 |
|
norm1 = 1/np.sum(np.array(p1)**2)**.5 |
|
x = [norm0*p0[0],0, norm1*p1[0]] |
|
y = [norm0*p0[1],0, norm1*p1[1]] |
|
|
|
curve = bezier.Curve(np.asfortranarray([x,y]), degree=2) |
|
|
|
points = np.linspace(0,1,n,endpoint=True) |
|
data = [] |
|
for p in points: |
|
data.append(curve.evaluate(float(p))) |
|
|
|
data = np.array(data).reshape(n,2) |
|
return data[:,0], data[:,1] |
|
|
|
def get_unit_circle_data(self, n=50): |
|
""" |
|
method for generating data on the unit circle |
|
:param n: number of equidistance points on the unit circle |
|
:return x: x coordinates of the points on the unit circle |
|
:return y: y coordinates of the poits on the unit cirlce |
|
""" |
|
alpha = np.linspace(0, 2*np.pi, n, endpoint=True) |
|
|
|
return np.cos(alpha), np.sin(alpha) |
|
|
|
def get_metric_color(self, value): |
|
""" |
|
Method for getting the color of the time loss or waiting time class |
|
:param value: feaute value in seconds |
|
:return color: string of class color |
|
""" |
|
if value < self.bounds[2]: |
|
return 'green' |
|
elif self.bounds[2] <= value < self.bounds[4]: |
|
return 'orange' |
|
else: |
|
return 'red' |
|
|
|
def int2str(self, x): |
|
return f"{int(x):_}".replace('_', '.') |
|
|
|
def float2str(self, x): |
|
return f"{float(x):_.2f}".replace('.', ',').replace('_', '.') |