nastasiasnk's picture
Update app.py
157cf08 verified
raw
history blame
14.2 kB
import gradio as gr
import pandas as pd
import numpy as np
import json
from io import StringIO
from collections import OrderedDict
import os
# ---------------------- Accessing data from Notion ---------------------- #
from notion_client import Client as client_notion
notionToken = os.getenv('notionToken')
if notionToken is None:
raise Exception("Notion token not found. Please check the environment variables.")
else:
print("Notion token found successfully!")
from config import landuseDatabaseId , subdomainAttributesDatabaseId
from imports_utils import fetch_all_database_pages
from imports_utils import get_property_value
from imports_utils import notion
landuse_attributes = fetch_all_database_pages(notion, landuseDatabaseId)
livability_attributes = fetch_all_database_pages(notion, subdomainAttributesDatabaseId)
# fetch the dictionary with landuse - domain pairs
landuseMapperDict ={}
subdomains_unique = []
for page in landuse_attributes:
value_landuse = get_property_value(page, "LANDUSE")
value_subdomain = get_property_value(page, "SUBDOMAIN_LIVEABILITY")
if value_subdomain and value_landuse:
landuseMapperDict[value_landuse] = value_subdomain
if value_subdomain != "":
subdomains_unique.append(value_subdomain)
# fetch the dictionary with subdomain attribute data
attributeMapperDict ={}
domains_unique = []
for page in livability_attributes:
subdomain = get_property_value(page, "SUBDOMAIN_UNIQUE")
sqm_per_employee = get_property_value(page, "SQM PER EMPL")
thresholds = get_property_value(page, "MANHATTAN THRESHOLD")
max_points = get_property_value(page, "LIVABILITY MAX POINT")
domain = get_property_value(page, "DOMAIN")
if thresholds:
attributeMapperDict[subdomain] = {
'sqmPerEmpl': sqm_per_employee if sqm_per_employee != "" else 0,
'thresholds': thresholds,
'max_points': max_points,
'domain': [domain if domain != "" else 0]
}
if domain != "":
domains_unique.append(domain)
# ---------------------- Accessing data from Speckle ---------------------- #
from specklepy.api.client import SpeckleClient
from specklepy.api.credentials import get_default_account, get_local_accounts
from specklepy.transports.server import ServerTransport
from specklepy.api import operations
from specklepy.objects.geometry import Polyline, Point
from specklepy.objects import Base
import imports_utils
import speckle_utils
import data_utils
from config import landuseDatabaseId , streamId, dmBranchName, dmCommitId, luBranchName, luCommitId
from imports_utils import speckleToken
from imports_utils import fetchDistanceMatrices
from config import distanceMatrixActivityNodes
from config import distanceMatrixTransportStops
CLIENT = SpeckleClient(host="https://speckle.xyz/")
account = get_default_account()
CLIENT.authenticate_with_token(token=speckleToken)
streamDistanceMatrices = speckle_utils.getSpeckleStream(streamId,dmBranchName,CLIENT, dmCommitId)
matrices = fetchDistanceMatrices (streamDistanceMatrices)
streamLanduses = speckle_utils.getSpeckleStream(streamId,luBranchName,CLIENT, luCommitId)
streamData = streamLanduses["@Data"]["@{0}"]
df_speckle_lu = speckle_utils.get_dataframe(streamData, return_original_df=False)
df_lu = df_speckle_lu.copy()
# set index column
df_lu = df_lu.set_index("ids", drop=False)
df_dm = matrices[distanceMatrixActivityNodes]
df_dm_transport = matrices[distanceMatrixTransportStops]
dm_dictionary = df_dm.to_dict('index')
df_dm_transport_dictionary = df_dm_transport.to_dict('index')
# filter activity nodes attributes
mask_connected = df_dm.index.tolist()
lu_columns = []
for name in df_lu.columns:
if name.startswith("lu+"):
lu_columns.append(name)
df_lu_filtered = df_lu[lu_columns].loc[mask_connected]
df_lu_filtered
df_lu_filtered_dict = df_lu_filtered.to_dict('index')
def test(input_json):
print("Received input")
# Parse the input JSON string
try:
inputs = json.loads(input_json)
except json.JSONDecodeError:
inputs = json.loads(input_json.replace("'", '"'))
# Accessing input data from Grasshopper
matrix = inputs['input']["matrix"]
landuses = inputs['input']["landuse_areas"]
transport_matrix = inputs['input']["transportMatrix"]
#attributeMapperDict = inputs['input']["attributeMapperDict"]
#landuseMapperDict = inputs['input']["landuseMapperDict"]
alpha = inputs['input']["alpha"]
alpha = float(alpha)
threshold = inputs['input']["threshold"]
threshold = float(threshold)
df_matrix = pd.DataFrame(matrix).T
df_matrix = df_matrix.round(0).astype(int)
df_landuses = pd.DataFrame(landuses).T
df_landuses = df_landuses.round(0).astype(int)
# List containing the substrings to check against
tranportModes = ["DRT", "GMT", "HSR"]
def split_dict_by_subkey(original_dict, substrings):
result_dicts = {substring: {} for substring in substrings}
for key, nested_dict in original_dict.items():
for subkey, value in nested_dict.items():
for substring in substrings:
if substring in subkey:
if key not in result_dicts[substring]:
result_dicts[substring][key] = {}
result_dicts[substring][key][subkey] = value
return result_dicts
result_dicts = split_dict_by_subkey(df_dm_transport, tranportModes)
# Accessing each dictionary
art_dict = result_dicts["DRT"]
gmt_dict = result_dicts["GMT"]
df_art_matrix = pd.DataFrame(art_dict).T
df_art_matrix = df_art_matrix.round(0).astype(int)
df_gmt_matrix = pd.DataFrame(gmt_dict).T
df_gmt_matrix = df_art_matrix.round(0).astype(int)
# create a mask based on the matrix size and ids, crop activity nodes to the mask
mask_connected = df_dm.index.tolist()
valid_indexes = [idx for idx in mask_connected if idx in df_landuses.index]
# Identify and report missing indexes
missing_indexes = set(mask_connected) - set(valid_indexes)
if missing_indexes:
print(f"Error: The following indexes were not found in the DataFrame: {missing_indexes}, length: {len(missing_indexes)}")
# Apply the filtered mask
df_landuses_filtered = df_landuses.loc[valid_indexes]
# find a set of unique domains, to which subdomains are aggregated
temp = []
for key, values in attributeMapperDict.items():
domain = attributeMapperDict[key]['domain']
for item in domain:
if ',' in item:
domain_list = item.split(',')
attributeMapperDict[key]['domain'] = domain_list
for domain in domain_list:
temp.append(domain)
else:
if item != 0:
temp.append(item)
domainsUnique = list(set(temp))
# find a list of unique subdomains, to which land uses are aggregated
temp = []
for key, values in landuseMapperDict.items():
subdomain = str(landuseMapperDict[key])
if subdomain != 0:
temp.append(subdomain)
subdomainsUnique = list(set(temp))
def landusesToSubdomains(DistanceMatrix, LanduseDf, LanduseToSubdomainDict, UniqueSubdomainsList):
df_LivabilitySubdomainsArea = pd.DataFrame(0, index=DistanceMatrix.index, columns=UniqueSubdomainsList)
for subdomain in UniqueSubdomainsList:
for lu, lu_subdomain in LanduseToSubdomainDict.items():
if lu_subdomain == subdomain:
if lu in LanduseDf.columns:
df_LivabilitySubdomainsArea[subdomain] = df_LivabilitySubdomainsArea[subdomain].add(LanduseDf[lu], fill_value=0)
else:
print(f"Warning: Column '{lu}' not found in landuse database")
return df_LivabilitySubdomainsArea
LivabilitySubdomainsWeights = landusesToSubdomains(df_dm,df_landuses_filtered,landuseMapperDict,subdomainsUnique)
def FindWorkplaces (DistanceMatrix,SubdomainAttributeDict,destinationWeights,UniqueSubdomainsList ):
df_LivabilitySubdomainsWorkplaces = pd.DataFrame(0, index=DistanceMatrix.index, columns=['jobs'])
for subdomain in UniqueSubdomainsList:
for key, value_list in SubdomainAttributeDict.items():
sqm_per_empl = float(SubdomainAttributeDict[subdomain]['sqmPerEmpl']) #[0])
if key in destinationWeights.columns and key == subdomain:
if sqm_per_empl > 0:
df_LivabilitySubdomainsWorkplaces['jobs'] += (round(destinationWeights[key] / sqm_per_empl,2)).fillna(0)
else:
df_LivabilitySubdomainsWorkplaces['jobs'] += 0
return df_LivabilitySubdomainsWorkplaces
WorkplacesNumber = FindWorkplaces(df_dm,attributeMapperDict,LivabilitySubdomainsWeights,subdomainsUnique)
# prepare an input weights dataframe for the parameter LivabilitySubdomainsInputs
LivabilitySubdomainsInputs =pd.concat([LivabilitySubdomainsWeights, WorkplacesNumber], axis=1)
def computeAccessibility (DistanceMatrix, destinationWeights=None,alpha = 0.0038, threshold = 600):
decay_factors = np.exp(-alpha * DistanceMatrix) * (DistanceMatrix <= threshold)
# for weighted accessibility (e. g. areas)
if destinationWeights is not None: #not destinationWeights.empty:
subdomainsAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=destinationWeights.columns)
for col in destinationWeights.columns:
subdomainsAccessibility[col] = (decay_factors * destinationWeights[col].values).sum(axis=1)
else:
print("Destination weights parameter is None")
return subdomainsAccessibility
def computeAccessibility_pointOfInterest (DistanceMatrix, columnName, alpha = 0.0038, threshold = 600):
decay_factors = np.exp(-alpha * DistanceMatrix) * (DistanceMatrix <= threshold)
pointOfInterestAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=[columnName])
for col in pointOfInterestAccessibility.columns:
pointOfInterestAccessibility[col] = (decay_factors * 1).sum(axis=1)
return pointOfInterestAccessibility
subdomainsAccessibility = computeAccessibility(df_dm,LivabilitySubdomainsInputs,alpha,threshold)
artAccessibility = computeAccessibility_pointOfInterest(df_art_matrix,'ART',alpha,threshold)
gmtAccessibility = computeAccessibility_pointOfInterest(df_gmt_matrix,'GMT+HSR',alpha,threshold)
AccessibilityInputs = pd.concat([subdomainsAccessibility, artAccessibility,gmtAccessibility], axis=1)
def remap(value, B_min, B_max, C_min, C_max):
return C_min + (((value - B_min) / (B_max - B_min))* (C_max - C_min))
if 'jobs' not in subdomainsAccessibility.columns:
print("Error: Column 'jobs' does not exist in the subdomainsAccessibility.")
def accessibilityToLivability (DistanceMatrix,accessibilityInputs, SubdomainAttributeDict,UniqueDomainsList):
livability = pd.DataFrame(index=DistanceMatrix.index, columns=accessibilityInputs.columns)
for domain in UniqueDomainsList:
livability[domain] = 0
livability.fillna(0, inplace=True)
templist = []
# remap accessibility to livability points
for key, values in SubdomainAttributeDict.items():
threshold = float(SubdomainAttributeDict[key]['thresholds'])
max_livability = float(SubdomainAttributeDict[key]['max_points'])
domains = [str(item) for item in SubdomainAttributeDict[key]['domain']]
if key in accessibilityInputs.columns and key != 'commercial':
livability_score = remap(accessibilityInputs[key], 0, threshold, 0, max_livability)
livability.loc[accessibilityInputs[key] >= threshold, key] = max_livability
livability.loc[accessibilityInputs[key] < threshold, key] = livability_score
if any(domains):
for domain in domains:
if domain != 'Workplaces':
livability.loc[accessibilityInputs[key] >= threshold, domain] += max_livability
livability.loc[accessibilityInputs[key] < threshold, domain] += livability_score
elif key == 'commercial':
livability_score = remap(accessibilityInputs['jobs'], 0, threshold, 0, max_livability)
livability.loc[accessibilityInputs['jobs'] >= threshold, domains[0]] = max_livability
livability.loc[accessibilityInputs['jobs'] < threshold, domains[0]] = livability_score
return livability
livability = accessibilityToLivability(df_dm,AccessibilityInputs,attributeMapperDict,domainsUnique)
livability_dictionary = livability.to_dict('index')
LivabilitySubdomainsInputs_dictionary = LivabilitySubdomainsInputs.to_dict('index')
subdomainsAccessibility_dictionary = AccessibilityInputs.to_dict('index')
artmatrix = df_art_matrix.to_dict('index')
# Prepare the output
output = {
"subdomainsAccessibility_dictionary": subdomainsAccessibility_dictionary,
"livability_dictionary": livability_dictionary,
"subdomainsWeights_dictionary": LivabilitySubdomainsInputs_dictionary,
"luDomainMapper": landuseMapperDict,
"attributeMapper": attributeMapperDict,
"fetchDm": dm_dictionary,
"artmatrix":artmatrix
}
return json.dumps(output)
# Define the Gradio interface with a single JSON input
iface = gr.Interface(
fn=test,
inputs=gr.Textbox(label="Input JSON", lines=20, placeholder="Enter JSON with all parameters here..."),
outputs=gr.JSON(label="Output JSON"),
title="testspace"
)
iface.launch()