import folium
import matplotlib.pyplot as plt
import streamlit as st
import numpy as np
import pandas as pd
import matplotlib as mpl
import bezier
class Visualization:
Class for handling all visualization of the streamlit dashboard
def __init__(self,):
self.cmap = mpl.colors.ListedColormap(['green', 'lightgreen', 'yellow', 'orange','red'])
self.bounds = [0,10, 20, 30, 45, 60]
self.final_result = ['A', 'B', 'C', 'D', 'E', 'F']
self.norm = mpl.colors.BoundaryNorm(self.bounds, self.cmap.N)
self.s_m =, norm=self.norm,)
self.min_ride_percentage = 0.05
if 'node_id' not in st.session_state:
st.session_state['node_id'] = 31334646
st.session_state['extension_radius'] = 20
def get_final_result(self, timeloss):
Function for calculating the time loss class in a range from
'A' very good to 'F' very bad
:param timeloss: worst relevant timeloss of node driving directions
:return: timeloss class
final = -1
for idx in range(1, len(self.bounds)):
if self.bounds[idx-1] < timeloss <=self.bounds[idx]:
return self.final_result[idx-1]
return self.final_result[final]
def create_folium_map(self, gdf):
Method for creating a folium map with choropleth layer of a geodataframe
:param gdf: geodataframe with node geometries
:return m: folium map object
point =[0,'geometry'].centroid
m = folium.Map(location=[point.y, point.x], zoom_start=18)
folium.Choropleth([0,'geometry'], fill_color='blue', line_color='blue').add_to(m)
folium.Choropleth([1,'geometry'], fill_color='blue', line_color='blue', fill_opacity=.3).add_to(m)
return m
def get_diretion_results(_self, directs, feature):
Method for creating a list of the directions plots and direction data in a dataframe
:param directs: in/out directions of the node
:param feature: time data feature of the node
:return figures: list with figure objects
:return feature: list with dataframes of time feature
figures, results =[], []
# go over all directions and create direction plot and table data
for dir in directs:
# get plotting feature of travel direction
fig_loss, result = _self.direction_plot(feature, directs, dir)
min_loss = _self.get_min_relavent_direction_loss(results)
return figures, results, min_loss
def get_min_relavent_direction_loss(self, results):
Method for calculating the timeloss of the worst relevant node path
:param results: dataframe with possible driving paths and timelosses
:return: timeloss
total_rides = sum([df[df['Richtung']=='Gesamt']['Fahrten'].sum() for df in results])
min_rides = self.min_ride_percentage*total_rides
loss_class = []
for idx, df in enumerate(results):
tmp = df[~(df['Richtung']=='Gesamt') & (df['Fahrten']>=min_rides)].reset_index()
if tmp.empty:
tmp['InDirection'] = idx
loss_class = pd.concat(loss_class, ignore_index=True)
idx = loss_class['Zeitverlust'].argmax()
return loss_class.loc[idx, ['InDirection', 'Zeitverlust', 'Richtung']]
def direction_plot(self, data, directs, current_key):
Method for generating the time direction plot for the current input direction
:param data: time quality feature for plotting
:param directs: travel directions for input output tracking
:param current_key: key of directs for plotting
:return fig_loss: matplotlib figure object with direction plot informations
:param df_result: dataframe with aggregated information of the time quality feature of the travel direction
:param df_time: datafrme with the aggregated rushhour feature of the travel direction
# initialise dataframes for displaying feature in dataframe format
df_result = pd.DataFrame(index=range(len(directs)+1),columns=['Richtung', 'Standzeit', 'Zeitverlust', 'Ratio', 'Fahrten'])
# define maplotlib figure and axes objects for direction plots
fig_loss, axs_loss = plt.subplots(1,3, figsize=(8,4), gridspec_kw={'width_ratios':[1, 1, 0.15]})
# setup input angle and transform data into the radians and the projection onto the unit circle
angle_in = directs[current_key]['value']
origin = (180+angle_in)%360
rad_origin = np.deg2rad(origin)
x_origin, y_origin = np.cos(rad_origin), np.sin(rad_origin)
# initialise tracking parameter for total time quality feature estimation of the travel directions
total_loss = 0
total_rides = 0
total_ratio = 0
total_waiting = 0
# loop over all out directions and aggregated data as well as plotting
for i, dkey_out in enumerate(directs):
angle_diff = abs(directs[current_key]['value'] - directs[dkey_out]['value'])
# skip u-turns on node
if (160<angle_diff<200) or (160<(360+angle_diff)%360 <200):
# transform output angles
angle_out = directs[dkey_out]['value']
rad_out = np.deg2rad(angle_out)
# get time quality data of input out combination
timeloss = data['TimeLoss'].at[int(current_key), int(dkey_out)]
rides = data['RideCount'].at[int(current_key), int(dkey_out)]
waiting = data['StandingTime'].at[int(current_key), int(dkey_out)]
ratio = waiting/(timeloss+1e-7)
# save aggregated time quality data in dataframe[i, 'Richtung'] = f"{directs[dkey_out]['name']}"[i, 'Zeitverlust'] = np.round(timeloss,2)[i, 'Standzeit'] = np.round(waiting,2)[i, 'Fahrten'] = rides[i, 'Ratio'] = np.round(ratio,2)
total_loss += rides*timeloss
total_rides += rides
total_ratio += rides*ratio
total_waiting += rides*waiting
# setup plotting parameter for ride count coding and bezier curve traveling direction visualization
lw = np.min([7, rides/10])
lw = np.max([lw, .5])
# bezier plot
x_out = np.cos(rad_out)
y_out = np.sin(rad_out)
x_bezier, y_bezier = self.get_bezier_curve_in_polar([x_origin, y_origin],[x_out, y_out])
# offset for direction label
x = x_out-np.sign(x_out)*.1
y = y_out-np.sign(y_out)*.1
# plot movement direction curves
# timeloss plot
if rides > 0:
color_loss = self.s_m.to_rgba(float(timeloss))
color_waiting = self.s_m.to_rgba(float(waiting))
color_loss = 'gray'
color_waiting = 'gray'
im = axs_loss[0].plot(x_bezier,y_bezier, color=color_loss, lw=lw)
# waiting time plot
im = axs_loss[1].plot(x_bezier,y_bezier, color=color_waiting, lw=lw)
# display direction label as text plot
axs_loss[0].text(x,y, f"{directs[dkey_out]['name']}")
axs_loss[1].text(x,y, f"{directs[dkey_out]['name']}")
# setup colorbar of the loss classes
cb1 = mpl.colorbar.ColorbarBase(axs_loss[2],
cb1.set_label('Zeitverlustklassen [s]', loc='center')
# plot arrow of input direction, set x and y limits and set axis off
# and plot unit circle for boundaries
for a in range(2):
# plot node boundatries as unit circle
axs_loss[a].plot(*self.get_unit_circle_data(), color='gray', lw=3, alpha=0.4)
# arrow for input direction
axs_loss[a].arrow(x_origin, y_origin, -.6*x_origin, -.6*y_origin, width=.07, color='k', alpha=.3)
# set boundaries and plot box
# set plot titles
# drop not used columns from dataframe
# calculate mean resulting feature of all directions for time quality feature[i+1, 'Richtung'] = f'Gesamt'[i+1, 'Fahrten'] = total_rides
if total_rides >0:[i+1, 'Zeitverlust'] = np.round(total_loss/total_rides, 2)[i+1, 'Standzeit'] = np.round(total_waiting/total_rides, 2)[i+1, 'Ratio'] = np.round(total_ratio/total_rides, 2)
df_result.fillna(0, inplace=True)
return fig_loss, df_result
def get_bezier_curve_in_polar(self, p0, p1, n=15):
Method for calculation a bezier curve given two points with the intersection point (0,0)
:param p0: point coordination of the staring point
:param p1: point coordinates of the end point
:param n: number of points for the line
# start, end point handling for transformation from unit squre to unit circle
norm0 = 1/np.sum(np.array(p0)**2)**.5
norm1 = 1/np.sum(np.array(p1)**2)**.5
x = [norm0*p0[0],0, norm1*p1[0]]
y = [norm0*p0[1],0, norm1*p1[1]]
# calculate bezier curve
curve = bezier.Curve(np.asfortranarray([x,y]), degree=2)
# get data from bezier curve
points = np.linspace(0,1,n,endpoint=True)
data = []
for p in points:
# reshape data
data = np.array(data).reshape(n,2)
return data[:,0], data[:,1]
def get_unit_circle_data(self, n=50):
method for generating data on the unit circle
:param n: number of equidistance points on the unit circle
:return x: x coordinates of the points on the unit circle
:return y: y coordinates of the poits on the unit cirlce
alpha = np.linspace(0, 2*np.pi, n, endpoint=True)
# return x and y coordinates in polor coordinate system with radius = 1
return np.cos(alpha), np.sin(alpha)
def get_metric_color(self, value):
Method for getting the color of the time loss or waiting time class
:param value: feaute value in seconds
:return color: string of class color
if value < self.bounds[2]:
return 'green'
elif self.bounds[2] <= value < self.bounds[4]:
return 'orange'
return 'red'
def int2str(self, x):
return f"{int(x):_}".replace('_', '.')
def float2str(self, x):
return f"{float(x):_.2f}".replace('.', ',').replace('_', '.')