import requests import streamlit as st import geopandas as gpd import pandas as pd import json import time class DataModel: def __init__(self,): self.base_url = 'https://infrasense-dashboard-aggregation-service-prod.delta.k8s-wdy.de' self.sleep = 10 #s @st.cache_data(show_spinner='Daten werden geladen ...') def get_data_of_node(_self, node_id, extension_radius): """ Method for loading and transforming node time quality data from BIQE-Monitor :param node_id: id of the node :param extension_radius: distance around the node geometry for better feture estimation :return directs: dictonary with main riding directions of the node :return feature: dictonary with time quality feature saved in a dataframe with traveling directions :return time: dictonary with rush hour feature saved in a dataframe with traveling directions :return kpis: main KPIs of the node for displaying in the dashboard :return gdf: GeoDataFrame with node geometry and extended node geometry """ # build request url url = f"{_self.base_url}/estimateTimeFeatureOfNodeAsync/{node_id}/{extension_radius}" response = requests.get(url,headers={'X-API-KEY':st.secrets['api_key']}) loading_finished = False if response.status_code == 200: loading_finished = True elif response.status_code == 202: response, loading_finished = _self.__waiting_for_calculation(node_id, extension_radius) # get error meassage if data loading failed if not loading_finished: st.error('Error while data loading') st.stop() # extrect feature variables from returned request json return _self.__extract_backend_data(response) def __waiting_for_calculation(self, node_id, extension_radius): """ Method for asyncron requests for Biqe monitor endpoint :param node_id: node id for calculation :param extension_radius: radius around node for more possible data points :return data_json: json with response data :return status: status of finished data loading """ # proof if data is loaded time.sleep(self.sleep) response = self.__check_request(node_id, extension_radius) while response.status_code == 202: time.sleep(self.sleep) response = self.__check_request(node_id, extension_radius) # if finished if response.status_code == 200: status = True else: status = False return response, status def __check_request(self, node_id, extension_radius): """ Method for looking if async calculation of node data is finished :param node_id: node id for calculation :param extension_radius: radius around node for more possible data points :return req: response data from endpoint """ url = self.base_url + f'/task_status/{node_id}/{extension_radius}/NodeTimeLoss' return requests.get(url, headers={'X-API-KEY':st.secrets['api_key']}) def __extract_backend_data(self, response): """ Method for extrction of the main variables from the request json :param json_data: json from request :return directs: dictonary with main riding directions of the node :return feature: dictonary with time quality feature saved in a dataframe with traveling directions :return time: dictonary with rush hour feature saved in a dataframe with traveling directions :return kpis: main KPIs of the node for displaying in the dashboard :return gdf: GeoDataFrame with node geometry and extended node geometry """ json_data = response.json() if 'state' not in json_data: st.write(json_data) st.error('Reload page again') st.stop() json_data = json_data['state'] # extract main riding directions of the node directs = dict(json_data['directs']) # extract and process time quality feature saved in a dataframe with traveling directions feature = {'RideCount':pd.read_json(json_data['feature']['RideCount']), 'Standing2LossRatio':pd.read_json(json_data['feature']['Standing2LossRatio']), 'TimeLoss':pd.read_json(json_data['feature']['TimeLoss']), 'StandingTime':pd.read_json(json_data['feature']['StandingTime'])} # extract main KPIs of the node for displaying in the dashboard kpis = json_data['kpis'] # extract GeoDataFrame with node geometry and extended node geometry gdf = gpd.GeoDataFrame.from_features(json.loads(kpis['geometry'])) return directs, feature, kpis, gdf