ChristophS's picture
reanming fronent filter
9b34089
import folium
import matplotlib.pyplot as plt
import streamlit as st
import numpy as np
import pandas as pd
import matplotlib as mpl
import bezier
class Visualization:
"""
Class for handling all visualization of the streamlit dashboard
"""
def __init__(self,):
self.cmap = mpl.colors.ListedColormap(['green', 'lightgreen', 'yellow', 'orange','red'])
self.cmap.set_over('darkred')
self.bounds = [0,10, 20, 30, 45, 60]
self.final_result = ['A', 'B', 'C', 'D', 'E', 'F']
self.norm = mpl.colors.BoundaryNorm(self.bounds, self.cmap.N)
self.s_m = mpl.cm.ScalarMappable(cmap=self.cmap, norm=self.norm,)
self.min_ride_percentage = 0.05
if 'node_id' not in st.session_state:
st.session_state['node_id'] = 31334646
st.session_state['extension_radius'] = 20
def get_final_result(self, timeloss):
"""
Function for calculating the time loss class in a range from
'A' very good to 'F' very bad
:param timeloss: worst relevant timeloss of node driving directions
:return: timeloss class
"""
final = -1
for idx in range(1, len(self.bounds)):
if self.bounds[idx-1] < timeloss <=self.bounds[idx]:
return self.final_result[idx-1]
return self.final_result[final]
def create_folium_map(self, gdf):
"""
Method for creating a folium map with choropleth layer of a geodataframe
:param gdf: geodataframe with node geometries
:return m: folium map object
"""
point = gdf.at[0,'geometry'].centroid
m = folium.Map(location=[point.y, point.x], zoom_start=18)
folium.Choropleth(gdf.at[0,'geometry'], fill_color='blue', line_color='blue').add_to(m)
folium.Choropleth(gdf.at[1,'geometry'], fill_color='blue', line_color='blue', fill_opacity=.3).add_to(m)
return m
@st.cache_data
def get_diretion_results(_self, directs, feature):
"""
Method for creating a list of the directions plots and direction data in a dataframe
:param directs: in/out directions of the node
:param feature: time data feature of the node
:return figures: list with figure objects
:return feature: list with dataframes of time feature
"""
figures, results =[], []
# go over all directions and create direction plot and table data
for dir in directs:
# get plotting feature of travel direction
fig_loss, result = _self.direction_plot(feature, directs, dir)
figures.append(fig_loss)
results.append(result)
min_loss = _self.get_min_relavent_direction_loss(results)
return figures, results, min_loss
def get_min_relavent_direction_loss(self, results):
"""
Method for calculating the timeloss of the worst relevant node path
:param results: dataframe with possible driving paths and timelosses
:return: timeloss
"""
total_rides = sum([df[df['Richtung']=='Gesamt']['Fahrten'].sum() for df in results])
min_rides = self.min_ride_percentage*total_rides
loss_class = []
for idx, df in enumerate(results):
tmp = df[~(df['Richtung']=='Gesamt') & (df['Fahrten']>=min_rides)].reset_index()
if tmp.empty:
continue
tmp['InDirection'] = idx
loss_class.append(tmp)
loss_class = pd.concat(loss_class, ignore_index=True)
idx = loss_class['Zeitverlust'].argmax()
return loss_class.loc[idx, ['InDirection', 'Zeitverlust', 'Richtung']]
def direction_plot(self, data, directs, current_key):
"""
Method for generating the time direction plot for the current input direction
:param data: time quality feature for plotting
:param directs: travel directions for input output tracking
:param current_key: key of directs for plotting
:return fig_loss: matplotlib figure object with direction plot informations
:param df_result: dataframe with aggregated information of the time quality feature of the travel direction
:param df_time: datafrme with the aggregated rushhour feature of the travel direction
"""
# initialise dataframes for displaying feature in dataframe format
df_result = pd.DataFrame(index=range(len(directs)+1),columns=['Richtung', 'Standzeit', 'Zeitverlust', 'Ratio', 'Fahrten'])
# define maplotlib figure and axes objects for direction plots
fig_loss, axs_loss = plt.subplots(1,3, figsize=(8,4), gridspec_kw={'width_ratios':[1, 1, 0.15]})
# setup input angle and transform data into the radians and the projection onto the unit circle
angle_in = directs[current_key]['value']
origin = (180+angle_in)%360
rad_origin = np.deg2rad(origin)
x_origin, y_origin = np.cos(rad_origin), np.sin(rad_origin)
# initialise tracking parameter for total time quality feature estimation of the travel directions
total_loss = 0
total_rides = 0
total_ratio = 0
total_waiting = 0
# loop over all out directions and aggregated data as well as plotting
for i, dkey_out in enumerate(directs):
angle_diff = abs(directs[current_key]['value'] - directs[dkey_out]['value'])
# skip u-turns on node
if (160<angle_diff<200) or (160<(360+angle_diff)%360 <200):
continue
# transform output angles
angle_out = directs[dkey_out]['value']
rad_out = np.deg2rad(angle_out)
# get time quality data of input out combination
timeloss = data['TimeLoss'].at[int(current_key), int(dkey_out)]
rides = data['RideCount'].at[int(current_key), int(dkey_out)]
waiting = data['StandingTime'].at[int(current_key), int(dkey_out)]
ratio = waiting/(timeloss+1e-7)
# save aggregated time quality data in dataframe
df_result.at[i, 'Richtung'] = f"{directs[dkey_out]['name']}"
df_result.at[i, 'Zeitverlust'] = np.round(timeloss,2)
df_result.at[i, 'Standzeit'] = np.round(waiting,2)
df_result.at[i, 'Fahrten'] = rides
df_result.at[i, 'Ratio'] = np.round(ratio,2)
total_loss += rides*timeloss
total_rides += rides
total_ratio += rides*ratio
total_waiting += rides*waiting
# setup plotting parameter for ride count coding and bezier curve traveling direction visualization
lw = np.min([7, rides/10])
lw = np.max([lw, .5])
# bezier plot
x_out = np.cos(rad_out)
y_out = np.sin(rad_out)
x_bezier, y_bezier = self.get_bezier_curve_in_polar([x_origin, y_origin],[x_out, y_out])
# offset for direction label
x = x_out-np.sign(x_out)*.1
y = y_out-np.sign(y_out)*.1
# plot movement direction curves
# timeloss plot
if rides > 0:
color_loss = self.s_m.to_rgba(float(timeloss))
color_waiting = self.s_m.to_rgba(float(waiting))
else:
color_loss = 'gray'
color_waiting = 'gray'
im = axs_loss[0].plot(x_bezier,y_bezier, color=color_loss, lw=lw)
# waiting time plot
im = axs_loss[1].plot(x_bezier,y_bezier, color=color_waiting, lw=lw)
# display direction label as text plot
axs_loss[0].text(x,y, f"{directs[dkey_out]['name']}")
axs_loss[1].text(x,y, f"{directs[dkey_out]['name']}")
# setup colorbar of the loss classes
cb1 = mpl.colorbar.ColorbarBase(axs_loss[2],
cmap=self.cmap,
norm=self.norm,
extend='max',
)#orientation='horizontal'
cb1.set_label('Zeitverlustklassen [s]', loc='center')
# plot arrow of input direction, set x and y limits and set axis off
# and plot unit circle for boundaries
for a in range(2):
# plot node boundatries as unit circle
axs_loss[a].plot(*self.get_unit_circle_data(), color='gray', lw=3, alpha=0.4)
# arrow for input direction
axs_loss[a].arrow(x_origin, y_origin, -.6*x_origin, -.6*y_origin, width=.07, color='k', alpha=.3)
# set boundaries and plot box
axs_loss[a].set_ylim([-1.21,1.01])
axs_loss[a].set_xlim([-1.01,1.01])
axs_loss[a].axis('off')
# set plot titles
axs_loss[0].set_title(f'Zeitverlust')
axs_loss[1].set_title(f'Standzeit')
# drop not used columns from dataframe
df_result.dropna(inplace=True)
# calculate mean resulting feature of all directions for time quality feature
df_result.at[i+1, 'Richtung'] = f'Gesamt'
df_result.at[i+1, 'Fahrten'] = total_rides
if total_rides >0:
df_result.at[i+1, 'Zeitverlust'] = np.round(total_loss/total_rides, 2)
df_result.at[i+1, 'Standzeit'] = np.round(total_waiting/total_rides, 2)
df_result.at[i+1, 'Ratio'] = np.round(total_ratio/total_rides, 2)
else:
df_result.fillna(0, inplace=True)
return fig_loss, df_result
def get_bezier_curve_in_polar(self, p0, p1, n=15):
"""
Method for calculation a bezier curve given two points with the intersection point (0,0)
:param p0: point coordination of the staring point
:param p1: point coordinates of the end point
:param n: number of points for the line
"""
# start, end point handling for transformation from unit squre to unit circle
norm0 = 1/np.sum(np.array(p0)**2)**.5
norm1 = 1/np.sum(np.array(p1)**2)**.5
x = [norm0*p0[0],0, norm1*p1[0]]
y = [norm0*p0[1],0, norm1*p1[1]]
# calculate bezier curve
curve = bezier.Curve(np.asfortranarray([x,y]), degree=2)
# get data from bezier curve
points = np.linspace(0,1,n,endpoint=True)
data = []
for p in points:
data.append(curve.evaluate(float(p)))
# reshape data
data = np.array(data).reshape(n,2)
return data[:,0], data[:,1]
def get_unit_circle_data(self, n=50):
"""
method for generating data on the unit circle
:param n: number of equidistance points on the unit circle
:return x: x coordinates of the points on the unit circle
:return y: y coordinates of the poits on the unit cirlce
"""
alpha = np.linspace(0, 2*np.pi, n, endpoint=True)
# return x and y coordinates in polor coordinate system with radius = 1
return np.cos(alpha), np.sin(alpha)
def get_metric_color(self, value):
"""
Method for getting the color of the time loss or waiting time class
:param value: feaute value in seconds
:return color: string of class color
"""
if value < self.bounds[2]:
return 'green'
elif self.bounds[2] <= value < self.bounds[4]:
return 'orange'
else:
return 'red'
def int2str(self, x):
return f"{int(x):_}".replace('_', '.')
def float2str(self, x):
return f"{float(x):_.2f}".replace('.', ',').replace('_', '.')