|
import requests |
|
import streamlit as st |
|
import geopandas as gpd |
|
import pandas as pd |
|
import json |
|
import time |
|
|
|
class DataModel: |
|
def __init__(self,): |
|
self.base_url = 'https://infrasense-dashboard-aggregation-service-prod.delta.k8s-wdy.de' |
|
self.sleep = 10 |
|
|
|
@st.cache_data(show_spinner='Daten werden geladen ...') |
|
def get_data_of_node(_self, node_id, extension_radius): |
|
""" |
|
Method for loading and transforming node time quality data from BIQE-Monitor |
|
:param node_id: id of the node |
|
:param extension_radius: distance around the node geometry for better feture estimation |
|
:return directs: dictonary with main riding directions of the node |
|
:return feature: dictonary with time quality feature saved in a dataframe with traveling directions |
|
:return time: dictonary with rush hour feature saved in a dataframe with traveling directions |
|
:return kpis: main KPIs of the node for displaying in the dashboard |
|
:return gdf: GeoDataFrame with node geometry and extended node geometry |
|
""" |
|
|
|
url = f"{_self.base_url}/estimateTimeFeatureOfNodeAsync/{node_id}/{extension_radius}" |
|
response = requests.get(url,headers={'X-API-KEY':st.secrets['api_key']}) |
|
loading_finished = False |
|
if response.status_code == 200: |
|
loading_finished = True |
|
elif response.status_code == 202: |
|
response, loading_finished = _self.__waiting_for_calculation(node_id, extension_radius) |
|
|
|
if not loading_finished: |
|
st.error('Error while data loading') |
|
st.stop() |
|
|
|
return _self.__extract_backend_data(response) |
|
|
|
def __waiting_for_calculation(self, node_id, extension_radius): |
|
""" |
|
Method for asyncron requests for Biqe monitor endpoint |
|
:param node_id: node id for calculation |
|
:param extension_radius: radius around node for more possible data points |
|
:return data_json: json with response data |
|
:return status: status of finished data loading |
|
""" |
|
|
|
time.sleep(self.sleep) |
|
response = self.__check_request(node_id, extension_radius) |
|
while response.status_code == 202: |
|
time.sleep(self.sleep) |
|
response = self.__check_request(node_id, extension_radius) |
|
|
|
if response.status_code == 200: |
|
status = True |
|
else: |
|
status = False |
|
return response, status |
|
|
|
def __check_request(self, node_id, extension_radius): |
|
""" |
|
Method for looking if async calculation of node data is finished |
|
:param node_id: node id for calculation |
|
:param extension_radius: radius around node for more possible data points |
|
:return req: response data from endpoint |
|
""" |
|
url = self.base_url + f'/task_status/{node_id}/{extension_radius}/NodeTimeLoss' |
|
return requests.get(url, headers={'X-API-KEY':st.secrets['api_key']}) |
|
|
|
def __extract_backend_data(self, response): |
|
""" |
|
Method for extrction of the main variables from the request json |
|
:param json_data: json from request |
|
:return directs: dictonary with main riding directions of the node |
|
:return feature: dictonary with time quality feature saved in a dataframe with traveling directions |
|
:return time: dictonary with rush hour feature saved in a dataframe with traveling directions |
|
:return kpis: main KPIs of the node for displaying in the dashboard |
|
:return gdf: GeoDataFrame with node geometry and extended node geometry |
|
""" |
|
json_data = response.json() |
|
if 'state' not in json_data: |
|
st.write(json_data) |
|
st.error('Reload page again') |
|
st.stop() |
|
json_data = json_data['state'] |
|
|
|
directs = dict(json_data['directs']) |
|
|
|
feature = {'RideCount':pd.read_json(json_data['feature']['RideCount']), |
|
'Standing2LossRatio':pd.read_json(json_data['feature']['Standing2LossRatio']), |
|
'TimeLoss':pd.read_json(json_data['feature']['TimeLoss']), |
|
'StandingTime':pd.read_json(json_data['feature']['StandingTime'])} |
|
|
|
kpis = json_data['kpis'] |
|
|
|
gdf = gpd.GeoDataFrame.from_features(json.loads(kpis['geometry'])) |
|
return directs, feature, kpis, gdf |
|
|