Spaces:
Sleeping
Sleeping
File size: 10,261 Bytes
99932ef fccab54 99932ef 46d9bb2 99932ef 09f4780 9528692 46d9bb2 cb29405 0324786 46d9bb2 99932ef cb29405 99932ef cb29405 0815edc cb29405 0324786 0815edc 99932ef 0324786 46d9bb2 1263e08 99932ef 46d9bb2 0324786 46d9bb2 0324786 46d9bb2 0324786 46d9bb2 0324786 cb29405 46d9bb2 0815edc 99932ef 0815edc 46d9bb2 0815edc 46d9bb2 0815edc 46d9bb2 0815edc 99932ef 0815edc 99932ef 0815edc 99932ef 0815edc 46d9bb2 99932ef 0815edc 46d9bb2 0815edc 99932ef 0815edc 99932ef b58a1ed fccab54 b58a1ed fccab54 b58a1ed fccab54 b58a1ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
import csv
import os
from typing import List, Tuple
import math
from shapely.validation import make_valid
def score_current(station_coord, df_features, cov_smsv, w_density, w_income, w_age) -> float:
"""
Calculate a score for a station based on density, income, and age factors.
Parameters:
----------
station_coord : tuple or list
Coordinates of the station in EPSG:3057 format (long, lat).
df_features : pandas.DataFrame
DataFrame containing small area features, including 'geometry', 'density',
'income_distribution_per_year', and 'age_distribution'.
cov_smsv : list[dict[int, perc. cov]]
For the given station, this includes id and percentage of the circle
covered by the respective small area (as specified by the) for every small area
that is within the radius
w_density : float
Weight for the density factor.
w_income : float
Weight for the income factor.
w_age : float
Weight for the age factor.
# TODO add weight for traffic around this stop
Returns:
-------
float
Weighted score for the station location.
"""
# 1. Get small area ids in range
# 2.
# Score = [for all areas in range((density) * coverage percentage)]
# TODO: take into account fjoldi starfandi, if there are more people who work than live => many people need to get there, also works the other way around.
total_score = 0
total_income_score = 0
total_density_score = 0
total_age_score = 0
aggregated_age_distribution = {}
aggregated_income_distribution = {}
small_area_contributions = {}
for smsv in cov_smsv:
smsv_info = df_features[df_features["smallAreaId"] == smsv["id"]]
# Get age distribution for the year 2024
age_dist = smsv_info["age_distribution"].iloc[0].get(2024, {}) # only interested in 2024 for current score
# Aggregate proportional age distribution
for age_group, population in age_dist.items():
proportion = population * (smsv["small_zone_percentage"]/100)
if age_group in aggregated_age_distribution:
aggregated_age_distribution[age_group] += proportion
else:
aggregated_age_distribution[age_group] = proportion
# Get income distribution for the year 2024
income_dist = smsv_info["income_distribution_per_year"].iloc[0].get(2024, {}) # only interested in 2024 for current score
# Aggregate proportional income distribution
for income_group, population in income_dist.items():
proportion = population * (smsv["small_zone_percentage"] / 100)
if income_group in aggregated_income_distribution:
aggregated_income_distribution[income_group] += proportion
else:
aggregated_income_distribution[income_group] = proportion
# Calculate density score
density_contribution = smsv_info["density"].iloc[0] * w_density * smsv["small_zone_percentage"] * 200
# Calculate age score
age_contribution = get_age_score(age_dist) * w_age * smsv["small_zone_percentage"]
# Calculate income score
income_contribution = get_income_score(income_dist) * w_income * smsv["small_zone_percentage"]
# Total contribution for this small area
area_score = density_contribution + age_contribution + income_contribution
total_score += area_score
# Total age score
total_age_score += age_contribution
# Total income score
total_income_score += income_contribution
# Total density score
total_density_score += density_contribution
# Store contribution data for this small area
small_area_contributions[smsv["id"]] = {
"density_score": density_contribution,
"age_score": age_contribution,
"income_score": income_contribution,
"total_score": area_score,
}
# # Calculate age score
# age_score = get_age_score(aggregated_age_distribution) * w_age
# total_score += age_score
# # Calculate income score
# income_score = get_income_score(aggregated_income_distribution) * w_income
# total_score += income_score
return {"total_score": total_score,
"income_score": total_income_score,
"age_score": total_age_score,
"density_score": total_density_score,
"age_data": aggregated_age_distribution,
"income_data": aggregated_income_distribution,
"small_area_contributions": small_area_contributions,
}
def get_age_score(age_distribution):
"""
Calculate a score based on age distribution.
Parameters:
----------
age_distribution : dict
A dictionary with age brackets as keys and population as values.
Returns:
-------
float
Normalized age-based score.
"""
# Define weights for age groups
age_weights = {
"0-4 ára": 0.3, # Very young children unlikely to use public transport independently
"5-9 ára": 0.7, # Primary school pupils
"10-14 ára": 1.0, # Secondary school pupils more likely to use public transport
"15-19 ára": 1.5, # Teenagers, often students or apprentices, heavy reliance on buses
"20-24 ára": 1.5, # Students, apprentices, or young professionals
"25-29 ára": 1.2, # Young professionals, still a common demographic for bus users
"30-34 ára": 1.0, # Starting to decline as personal vehicle ownership increases
"35-39 ára": 0.9,
"40-44 ára": 0.8,
"45-49 ára": 0.7,
"50-54 ára": 0.6,
"55-59 ára": 0.6,
"60-64 ára": 0.8, # Approaching retirement, may rely more on buses
"65-69 ára": 1.2, # Pensioners starting to rely on public transport
"70-74 ára": 1.3, # Active pensioners, heavy bus users
"75-79 ára": 1.3,
"80-84 ára": 1.1, # Decline due to physical limitations
"85-89 ára": 0.6, # Very old, fewer likely to use buses
"90 ára og eldri": 0.2, # Most unlikely to use public transport independently
}
# Calculate the weighted sum of the age distribution
weighted_sum = sum(age_distribution.get(age, 0) * weight for age, weight in age_weights.items())
# Normalize the score by the total population
total_population = sum(age_distribution.values())
if total_population == 0:
return 0
return weighted_sum / total_population
def get_income_score(income_distribution):
"""
Calculate a score based on income distribution.
Parameters:
----------
income_distribution : dict
A dictionary with income classes as keys and population as values.
Returns:
-------
float
Normalized income-based score.
"""
# Define weights for income classes (1 = highest income, 10 = lowest)
income_weights = {
1: 0.5, # Higher-income groups less dependent on public transport
2: 0.6,
3: 0.7,
4: 0.8,
5: 1.0,
6: 1.1,
7: 1.2,
8: 1.3,
9: 1.4,
10: 1.5, # Lower-income groups more dependent on public transport
}
# Calculate the weighted sum of the income distribution
weighted_sum = sum(income_distribution.get(income_class, 0) * weight for income_class, weight in income_weights.items())
# Normalize the score by the total population
total_population = sum(income_distribution.values())
if total_population == 0:
return 0
return weighted_sum / total_population
from typing import List, Tuple, Dict
from concurrent.futures import ThreadPoolExecutor, as_completed
import math
def calculate_distance(coord1: Tuple[float, float], coord2: Tuple[float, float]) -> float:
"""Calculate the Euclidean distance between two coordinates."""
return math.sqrt((coord1[0] - coord2[0])**2 + (coord1[1] - coord2[1])**2)
def calc_score_line(stations_coordinates: List[Tuple[float, float]], station_scores: Dict[str, List[float]], w_density, w_income, w_age, radius):
PENALTY_SCALE = 1.0
individual_total_scores = [station["total_score"] for station in station_scores]
overlap_factors = [0] * len(stations_coordinates) # Initialize overlap factors for each station
total_individual_score = sum(individual_total_scores) / len(individual_total_scores) if individual_total_scores else 0
def compute_overlap(i: int, coord1: Tuple[float, float]) -> List[float]:
"""Compute the overlap factors for a single station."""
local_overlap = [0] * len(stations_coordinates)
for j, coord2 in enumerate(stations_coordinates):
if i != j: # Avoid self-comparison
distance = calculate_distance(coord1, coord2)
if distance < radius:
# Calculate overlap fraction (inverse of distance within the radius)
overlap_factor = (radius - distance) / radius # Normalize overlap to [0, 1]
local_overlap[i] += overlap_factor
local_overlap[j] += overlap_factor
return local_overlap
# Multithreading the computation of overlap factors
with ThreadPoolExecutor() as executor:
futures = {executor.submit(compute_overlap, i, coord1): i for i, coord1 in enumerate(stations_coordinates)}
for future in as_completed(futures):
i = futures[future]
local_overlap = future.result()
overlap_factors = [sum(x) for x in zip(overlap_factors, local_overlap)]
# Scale down individual scores based on overlap factors
adjusted_scores = [
max(0, score * (1 - PENALTY_SCALE * min(1, overlap_factors[i]))) # Cap scaling at 100%
for i, score in enumerate(individual_total_scores)
]
# Final aggregated score
final_score = sum(adjusted_scores) / len(adjusted_scores) if adjusted_scores else 0
# Return detailed results
result = {
"individual_scores": individual_total_scores,
"adjusted_scores": adjusted_scores,
"overlap_factors": overlap_factors,
"total_individual_score": total_individual_score,
"final_score": final_score
}
return result
|