import sys #import other libaries from specklepy.api.client import SpeckleClient from specklepy.api.credentials import get_default_account, get_local_accounts from specklepy.transports.server import ServerTransport from specklepy.api import operations from specklepy.objects.geometry import Polyline, Point from specklepy.objects import Base import numpy as np import pandas as pd import matplotlib.pyplot as plt #import seaborn as sns import math import matplotlib #from google.colab import files import json from notion_client import Client import os # Fetch the token securely from environment variables notion_token = os.getenv('notionToken') # Initialize the Notion client with your token notion = Client(auth=notion_token) # ---------------------------------------------------------------------------------- speckleToken = os.getenv('speckleToken') if speckleToken is None: raise Exception("Speckle token not found") else: print("Speckle token found successfully!") #CLIENT = SpeckleClient(host="https://speckle.xyz/") #CLIENT.authenticate_with_token(token=userdata.get(speckleToken)) CLIENT = SpeckleClient(host="https://speckle.xyz/") account = get_default_account() CLIENT.authenticate(token=speckleToken) # query full database def fetch_all_database_pages(client, database_id): """ Fetches all pages from a specified Notion database. :param client: Initialized Notion client. :param database_id: The ID of the Notion database to query. :return: A list containing all pages from the database. """ start_cursor = None all_pages = [] while True: response = client.databases.query( **{ "database_id": database_id, "start_cursor": start_cursor } ) all_pages.extend(response['results']) # Check if there's more data to fetch if response['has_more']: start_cursor = response['next_cursor'] else: break return all_pages def get_property_value(page, property_name): """ Extracts the value from a specific property in a Notion page based on its type. :param page: The Notion page data as retrieved from the API. :param property_name: The name of the property whose value is to be fetched. :return: The value or values contained in the specified property, depending on type. """ # Check if the property exists in the page if property_name not in page['properties']: return None # or raise an error if you prefer property_data = page['properties'][property_name] prop_type = property_data['type'] # Handle 'title' and 'rich_text' types if prop_type in ['title', 'rich_text']: return ''.join(text_block['text']['content'] for text_block in property_data[prop_type]) # Handle 'number' type elif prop_type == 'number': return property_data[prop_type] # Handle 'select' type elif prop_type == 'select': return property_data[prop_type]['name'] if property_data[prop_type] else None # Handle 'multi_select' type elif prop_type == 'multi_select': return [option['name'] for option in property_data[prop_type]] # Handle 'date' type elif prop_type == 'date': if property_data[prop_type]['end']: return (property_data[prop_type]['start'], property_data[prop_type]['end']) else: return property_data[prop_type]['start'] # Handle 'relation' type elif prop_type == 'relation': return [relation['id'] for relation in property_data[prop_type]] # Handle 'people' type elif prop_type == 'people': return [person['name'] for person in property_data[prop_type] if 'name' in person] # Add more handlers as needed for other property types else: # Return None or raise an error for unsupported property types return None def get_page_by_id(notion_db_pages, page_id): for pg in notion_db_pages: if pg["id"] == page_id: return pg """ def streamMatrices (speckleToken, stream_id, branch_name_dm, commit_id): #stream_id="ebcfc50abe" stream_distance_matrices = speckle_utils.getSpeckleStream(stream_id, branch_name_dm, CLIENT, commit_id = commit_id_dm) return stream_distance_matrices """ def fetchDomainMapper (luAttributePages): lu_domain_mapper ={} subdomains_unique = [] for page in lu_attributes: value_landuse = get_property_value(page, "LANDUSE") value_subdomain = get_property_value(page, "SUBDOMAIN_LIVEABILITY") if value_subdomain and value_landuse: lu_domain_mapper[value_landuse] = value_subdomain if value_subdomain != "": subdomains_unique.append(value_subdomain) #subdomains_unique = list(set(subdomains_unique)) return lu_domain_mapper def fetchSubdomainMapper (livability_attributes): attribute_mapper ={} domains_unique = [] for page in domain_attributes: subdomain = get_property_value(page, "SUBDOMAIN_UNIQUE") sqm_per_employee = get_property_value(page, "SQM PER EMPL") thresholds = get_property_value(page, "MANHATTAN THRESHOLD") max_points = get_property_value(page, "LIVABILITY MAX POINT") domain = get_property_value(page, "DOMAIN") if thresholds: attribute_mapper[subdomain] = { 'sqmPerEmpl': [sqm_per_employee if sqm_per_employee != "" else 0], 'thresholds': thresholds, 'max_points': max_points, 'domain': [domain if domain != "" else 0] } if domain != "": domains_unique.append(domain) #domains_unique = list(set(domains_unique)) return attribute_mapper def fetchDistanceMatrices (stream_distance_matrices): # navigate to list with speckle objects of interest distance_matrices = {} for distM in stream_distance_matrices["@Data"]['@{0}']: for kk in distM.__dict__.keys(): try: if kk.split("+")[1].startswith("distance_matrix"): distance_matrix_dict = json.loads(distM[kk]) origin_ids = distance_matrix_dict["origin_uuid"] destination_ids = distance_matrix_dict["destination_uuid"] distance_matrix = distance_matrix_dict["matrix"] # Convert the distance matrix to a DataFrame df_distances = pd.DataFrame(distance_matrix, index=origin_ids, columns=destination_ids) # i want to add the index & colum names to dist_m_csv #distance_matrices[kk] = dist_m_csv[kk] distance_matrices[kk] = df_distances except: pass return distance_matrices def splitDictByStrFragmentInColumnName(original_dict, substrings): result_dicts = {substring: {} for substring in substrings} for key, nested_dict in original_dict.items(): for subkey, value in nested_dict.items(): for substring in substrings: if substring in subkey: if key not in result_dicts[substring]: result_dicts[substring][key] = {} result_dicts[substring][key][subkey] = value return result_dicts def landusesToSubdomains(DistanceMatrix, LanduseDf, LanduseToSubdomainDict, UniqueSubdomainsList): df_LivabilitySubdomainsArea = pd.DataFrame(0, index=DistanceMatrix.index, columns=UniqueSubdomainsList) for subdomain in UniqueSubdomainsList: for lu, lu_subdomain in LanduseToSubdomainDict.items(): if lu_subdomain == subdomain: if lu in LanduseDf.columns: if LanduseDf[lu] is not None: df_LivabilitySubdomainsArea[subdomain] = df_LivabilitySubdomainsArea[subdomain].add(LanduseDf[lu], fill_value=0) else: print(f"Warning: Column '{lu}' not found in landuse database") return df_LivabilitySubdomainsArea def FindWorkplacesNumber (DistanceMatrix,SubdomainAttributeDict,destinationWeights,UniqueSubdomainsList ): df_LivabilitySubdomainsWorkplaces = pd.DataFrame(0, index=DistanceMatrix.index, columns=['jobs']) for subdomain in UniqueSubdomainsList: for key, value_list in SubdomainAttributeDict.items(): sqm_per_empl = float(SubdomainAttributeDict[subdomain]['sqmPerEmpl']) if key in destinationWeights.columns and key == subdomain: if sqm_per_empl > 0: df_LivabilitySubdomainsWorkplaces['jobs'] += (round(destinationWeights[key] / sqm_per_empl,2)).fillna(0) else: df_LivabilitySubdomainsWorkplaces['jobs'] += 0 return df_LivabilitySubdomainsWorkplaces def computeAccessibility (DistanceMatrix, destinationWeights=None,alpha = 0.0038, threshold = 600): decay_factors = np.exp(-alpha * DistanceMatrix) * (DistanceMatrix <= threshold) # for weighted accessibility (e. g. areas) if destinationWeights is not None: #not destinationWeights.empty: subdomainsAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=destinationWeights.columns) for col in destinationWeights.columns: subdomainsAccessibility[col] = (decay_factors * destinationWeights[col].values).sum(axis=1) else: print("Destination weights parameter is None") return subdomainsAccessibility def computeAccessibility_pointOfInterest (DistanceMatrix, columnName, alpha = 0.0038, threshold = 600): decay_factors = np.exp(-alpha * DistanceMatrix) * (DistanceMatrix <= threshold) pointOfInterestAccessibility = pd.DataFrame(index=DistanceMatrix.index, columns=[columnName]) for col in pointOfInterestAccessibility.columns: pointOfInterestAccessibility[col] = (decay_factors * 1).sum(axis=1) return pointOfInterestAccessibility def remap(value, B_min, B_max, C_min, C_max): return C_min + (((value - B_min) / (B_max - B_min))* (C_max - C_min)) def accessibilityToLivability (DistanceMatrix,accessibilityInputs, SubdomainAttributeDict,UniqueDomainsList): livability = pd.DataFrame(index=DistanceMatrix.index, columns=accessibilityInputs.columns) for domain in UniqueDomainsList: livability[domain] = 0 livability.fillna(0, inplace=True) templist = [] # remap accessibility to livability points for key, values in SubdomainAttributeDict.items(): threshold = float(SubdomainAttributeDict[key]['thresholds']) max_livability = float(SubdomainAttributeDict[key]['max_points']) domains = [str(item) for item in SubdomainAttributeDict[key]['domain']] if key in accessibilityInputs.columns and key != 'commercial': livability_score = remap(accessibilityInputs[key], 0, threshold, 0, max_livability) livability.loc[accessibilityInputs[key] >= threshold, key] = max_livability livability.loc[accessibilityInputs[key] < threshold, key] = livability_score if any(domains): for domain in domains: if domain != 'Workplaces': livability.loc[accessibilityInputs[key] >= threshold, domain] += max_livability livability.loc[accessibilityInputs[key] < threshold, domain] += livability_score elif key == 'commercial': livability_score = remap(accessibilityInputs['jobs'], 0, threshold, 0, max_livability) livability.loc[accessibilityInputs['jobs'] >= threshold, domains[0]] = max_livability livability.loc[accessibilityInputs['jobs'] < threshold, domains[0]] = livability_score return livability