Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import json | |
from io import StringIO | |
from collections import OrderedDict | |
import os | |
# ---------------------- Accessing data from Notion ---------------------- # | |
from notion_client import Client as client_notion | |
notionToken = os.getenv('notionToken') | |
if notionToken is None: | |
raise Exception("Notion token not found. Please check the environment variables.") | |
else: | |
print("Notion token found successfully!") | |
from config import landuseDatabaseId , subdomainAttributesDatabaseId | |
from imports_utils import fetch_all_database_pages | |
from imports_utils import get_property_value | |
from imports_utils import notion | |
from config import landuseColumnName | |
from config import subdomainColumnName | |
from config import sqmPerEmployeeColumnName | |
from config import thresholdsColumnName | |
from config import maxPointsColumnName | |
from config import domainColumnName | |
landuse_attributes = fetch_all_database_pages(notion, landuseDatabaseId) | |
livability_attributes = fetch_all_database_pages(notion, subdomainAttributesDatabaseId) | |
# fetch the dictionary with landuse - domain pairs | |
landuseMapperDict ={} | |
subdomains_unique = [] | |
for page in landuse_attributes: | |
value_landuse = get_property_value(page, "LANDUSE") | |
value_subdomain = get_property_value(page, "SUBDOMAIN_LIVABILITY") | |
if value_subdomain and value_landuse: | |
landuseMapperDict[value_landuse] = value_subdomain | |
if value_subdomain != "": | |
subdomains_unique.append(value_subdomain) | |
# fetch the dictionary with subdomain attribute data | |
attributeMapperDict ={} | |
domains_unique = [] | |
for page in livability_attributes: | |
subdomain = get_property_value(page, "SUBDOMAIN_LIVABILITY") | |
sqm_per_employee = get_property_value(page, "SQM PER EMPL") | |
thresholds = get_property_value(page, "MANHATTAN THRESHOLD") | |
max_points = get_property_value(page, "LIVABILITY MAX POINT") | |
domain = get_property_value(page, "DOMAIN_LIVABILITY") | |
if thresholds: | |
attributeMapperDict[subdomain] = { | |
'sqmPerEmpl': sqm_per_employee if sqm_per_employee != "" else 0, | |
'thresholds': thresholds, | |
'max_points': max_points, | |
'domain': [domain if domain != "" else 0] | |
} | |
if domain != "": | |
domains_unique.append(domain) | |
# ---------------------- Accessing data from Speckle ---------------------- # | |
from specklepy.api.client import SpeckleClient | |
from specklepy.api.credentials import get_default_account, get_local_accounts | |
from specklepy.transports.server import ServerTransport | |
from specklepy.api import operations | |
from specklepy.objects.geometry import Polyline, Point | |
from specklepy.objects import Base | |
import imports_utils | |
import speckle_utils | |
import data_utils | |
from config import landuseDatabaseId , streamId, dmBranchName, dmCommitId, luBranchName, luCommitId | |
from imports_utils import speckleToken | |
from imports_utils import fetchDistanceMatrices | |
from config import distanceMatrixActivityNodes | |
from config import distanceMatrixTransportStops | |
CLIENT = SpeckleClient(host="https://speckle.xyz/") | |
account = get_default_account() | |
CLIENT.authenticate_with_token(token=speckleToken) | |
streamDistanceMatrices = speckle_utils.getSpeckleStream(streamId,dmBranchName,CLIENT, dmCommitId) | |
matrices = fetchDistanceMatrices (streamDistanceMatrices) | |
streamLanduses = speckle_utils.getSpeckleStream(streamId,luBranchName,CLIENT, luCommitId) | |
streamData = streamLanduses["@Data"]["@{0}"] | |
df_speckle_lu = speckle_utils.get_dataframe(streamData, return_original_df=False) | |
df_lu = df_speckle_lu.copy() | |
df_lu = df_lu.astype(str) | |
# set index column | |
df_lu = df_lu.set_index("ids", drop=False) | |
df_dm = matrices[distanceMatrixActivityNodes] | |
df_dm_transport = matrices[distanceMatrixTransportStops] | |
dm_dictionary = df_dm.to_dict('index') | |
df_dm_transport_dictionary = df_dm_transport.to_dict('index') | |
# filter activity nodes attributes | |
mask_connected = df_dm.index.tolist() | |
lu_columns = [] | |
for name in df_lu.columns: | |
if name.startswith("lu+"): | |
lu_columns.append(name) | |
df_lu_filtered = df_lu[lu_columns].loc[mask_connected] | |
df_lu_filtered.columns = [col.replace('lu+', '') for col in df_lu_filtered.columns] | |
df_lu_filtered.columns = [col.replace('ASSETS+', '') for col in df_lu_filtered.columns] | |
df_lu_filtered = df_lu_filtered.astype(int) | |
df_lu_filtered = df_lu_filtered.T.groupby(level=0).sum().T | |
df_lu_filtered_dict = df_lu_filtered.to_dict('index') | |
def test(input_json): | |
print("Received input") | |
# Parse the input JSON string | |
try: | |
inputs = json.loads(input_json) | |
except json.JSONDecodeError: | |
inputs = json.loads(input_json.replace("'", '"')) | |
# ------------------------- Accessing input data from Grasshopper ------------------------- # | |
matrix = inputs['input']["matrix"] | |
matrix_transport = inputs['input']["transportMatrix"] | |
landuses = inputs['input']["landuse_areas"] | |
""" | |
if df_lu_filtered is None or df_lu_filtered.empty: | |
landuses = inputs['input']["landuse_areas"] | |
df_landuses = pd.DataFrame(landuses).T | |
df_landuses = df_landuses.round(0).astype(int) | |
else: | |
df_landuses = df_lu_filtered | |
df_landuses = df_landuses.round(0).astype(int) | |
""" | |
df_landuses = df_lu_filtered | |
df_landuses = df_landuses.round(0).astype(int) | |
attributeMapperDict_gh = inputs['input']["attributeMapperDict"] | |
landuseMapperDict_gh = inputs['input']["landuseMapperDict"] | |
alpha = inputs['input']["alpha"] | |
alpha = float(alpha) | |
threshold = inputs['input']["threshold"] | |
threshold = float(threshold) | |
df_matrix = pd.DataFrame(matrix).T | |
df_matrix = df_matrix.round(0).astype(int) | |
from imports_utils import splitDictByStrFragmentInColumnName | |
# List containing the substrings to check against | |
tranportModes = ["DRT", "GMT", "HSR"] | |
result_dicts = splitDictByStrFragmentInColumnName(df_dm_transport_dictionary, tranportModes) | |
# Accessing each dictionary | |
art_dict = result_dicts["DRT"] | |
gmt_dict = result_dicts["GMT"] | |
df_art_matrix = pd.DataFrame(art_dict).T | |
df_art_matrix = df_art_matrix.round(0).astype(int) | |
df_gmt_matrix = pd.DataFrame(gmt_dict).T | |
df_gmt_matrix = df_art_matrix.round(0).astype(int) | |
# create a mask based on the matrix size and ids, crop activity nodes to the mask | |
mask_connected = df_dm.index.tolist() | |
valid_indexes = [idx for idx in mask_connected if idx in df_landuses.index] | |
# Identify and report missing indexes | |
missing_indexes = set(mask_connected) - set(valid_indexes) | |
if missing_indexes: | |
print(f"Error: The following indexes were not found in the DataFrame: {missing_indexes}, length: {len(missing_indexes)}") | |
# Apply the filtered mask | |
df_landuses_filtered = df_landuses.loc[valid_indexes] | |
# find a set of unique domains, to which subdomains are aggregated | |
temp = [] | |
for key, values in attributeMapperDict.items(): | |
domain = attributeMapperDict[key]['domain'] | |
for item in domain: | |
if ',' in item: | |
domain_list = item.split(',') | |
attributeMapperDict[key]['domain'] = domain_list | |
for domain in domain_list: | |
temp.append(domain) | |
else: | |
if item != 0: | |
temp.append(item) | |
domainsUnique = list(set(temp)) | |
# find a list of unique subdomains, to which land uses are aggregated | |
temp = [] | |
for key, values in landuseMapperDict.items(): | |
subdomain = str(landuseMapperDict[key]) | |
if subdomain != 0: | |
temp.append(subdomain) | |
subdomainsUnique = list(set(temp)) | |
from imports_utils import landusesToSubdomains | |
from imports_utils import FindWorkplacesNumber | |
from imports_utils import computeAccessibility | |
from imports_utils import computeAccessibility_pointOfInterest | |
from imports_utils import remap | |
from imports_utils import accessibilityToLivability | |
LivabilitySubdomainsWeights = landusesToSubdomains(df_dm,df_lu_filtered,landuseMapperDict,subdomainsUnique) | |
WorkplacesNumber = FindWorkplacesNumber(df_dm,attributeMapperDict,LivabilitySubdomainsWeights,subdomainsUnique) | |
# prepare an input weights dataframe for the parameter LivabilitySubdomainsInputs | |
LivabilitySubdomainsInputs =pd.concat([LivabilitySubdomainsWeights, WorkplacesNumber], axis=1) | |
subdomainsAccessibility = computeAccessibility(df_dm,LivabilitySubdomainsInputs,alpha,threshold) | |
artAccessibility = computeAccessibility_pointOfInterest(df_art_matrix,'ART',alpha,threshold) | |
gmtAccessibility = computeAccessibility_pointOfInterest(df_gmt_matrix,'GMT+HSR',alpha,threshold) | |
AccessibilityInputs = pd.concat([subdomainsAccessibility, artAccessibility,gmtAccessibility], axis=1) | |
if 'jobs' not in subdomainsAccessibility.columns: | |
print("Error: Column 'jobs' does not exist in the subdomainsAccessibility.") | |
livability = accessibilityToLivability(df_dm,AccessibilityInputs,attributeMapperDict,domainsUnique) | |
livability_dictionary = livability.to_dict('index') | |
LivabilitySubdomainsInputs_dictionary = LivabilitySubdomainsInputs.to_dict('index') | |
subdomainsAccessibility_dictionary = AccessibilityInputs.to_dict('index') | |
artmatrix = df_art_matrix.to_dict('index') | |
LivabilitySubdomainsWeights_dictionary = LivabilitySubdomainsWeights.to_dict('index') | |
# Prepare the output | |
output = { | |
#"subdomainsAccessibility_dictionary": subdomainsAccessibility_dictionary, | |
#"livability_dictionary": livability_dictionary, | |
"subdomainsWeights_dictionary": LivabilitySubdomainsInputs_dictionary, | |
"luDomainMapper": landuseMapperDict, | |
"attributeMapper": attributeMapperDict, | |
"fetchDm": dm_dictionary, | |
"landuses":df_lu_filtered_dict | |
} | |
return json.dumps(output) | |
# Define the Gradio interface with a single JSON input | |
iface = gr.Interface( | |
fn=test, | |
inputs=gr.Textbox(label="Input JSON", lines=20, placeholder="Enter JSON with all parameters here..."), | |
outputs=gr.JSON(label="Output JSON"), | |
title="testspace" | |
) | |
iface.launch() |