File size: 10,261 Bytes
99932ef
 
fccab54
 
99932ef
46d9bb2
99932ef
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
09f4780
 
9528692
46d9bb2
 
 
cb29405
0324786
46d9bb2
99932ef
cb29405
99932ef
 
cb29405
0815edc
cb29405
 
 
 
 
 
 
 
 
0324786
0815edc
99932ef
0324786
 
 
 
 
 
 
 
46d9bb2
1263e08
99932ef
46d9bb2
 
0324786
46d9bb2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0324786
 
46d9bb2
 
 
0324786
46d9bb2
 
0324786
cb29405
46d9bb2
0815edc
 
 
 
 
 
 
99932ef
0815edc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46d9bb2
0815edc
 
46d9bb2
0815edc
 
 
 
 
46d9bb2
0815edc
 
99932ef
0815edc
 
 
 
99932ef
0815edc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99932ef
0815edc
46d9bb2
99932ef
0815edc
46d9bb2
0815edc
 
99932ef
0815edc
99932ef
b58a1ed
 
 
 
 
 
 
 
 
 
fccab54
 
 
 
 
b58a1ed
 
 
fccab54
 
 
 
 
 
b58a1ed
 
 
 
 
 
 
 
 
 
 
fccab54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b58a1ed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
import csv
import os 
from typing import List, Tuple
import math

from shapely.validation import make_valid

def score_current(station_coord, df_features, cov_smsv, w_density, w_income, w_age) -> float:
    """
    Calculate a score for a station based on density, income, and age factors.

    Parameters:
    ----------
    station_coord : tuple or list
        Coordinates of the station in EPSG:3057 format (long, lat).
    df_features : pandas.DataFrame
        DataFrame containing small area features, including 'geometry', 'density', 
        'income_distribution_per_year', and 'age_distribution'.
    cov_smsv : list[dict[int, perc. cov]]
        For the given station, this includes id and percentage of the circle
        covered by the respective small area (as specified by the) for every small area
        that is within the radius
    w_density : float
        Weight for the density factor.
    w_income : float
        Weight for the income factor.
    w_age : float
        Weight for the age factor.
    # TODO add weight for traffic around this stop

    Returns:
    -------
    float
        Weighted score for the station location.
    """
    # 1. Get small area ids in range
    # 2. 
    # Score = [for all areas in range((density) * coverage percentage)]

    # TODO: take into account fjoldi starfandi, if there are more people who work than live => many people need to get there, also works the other way around.
    total_score = 0
    total_income_score = 0
    total_density_score = 0
    total_age_score = 0
    aggregated_age_distribution = {}
    aggregated_income_distribution = {}
    small_area_contributions = {}

    for smsv in cov_smsv:
        smsv_info = df_features[df_features["smallAreaId"] == smsv["id"]]

        # Get age distribution for the year 2024
        age_dist = smsv_info["age_distribution"].iloc[0].get(2024, {})  # only interested in 2024 for current score
        
        # Aggregate proportional age distribution
        for age_group, population in age_dist.items():
            proportion = population * (smsv["small_zone_percentage"]/100)
            if age_group in aggregated_age_distribution:
                aggregated_age_distribution[age_group] += proportion
            else:
                aggregated_age_distribution[age_group] = proportion

        # Get income distribution for the year 2024
        income_dist = smsv_info["income_distribution_per_year"].iloc[0].get(2024, {})  # only interested in 2024 for current score

        # Aggregate proportional income distribution
        for income_group, population in income_dist.items():
            proportion = population * (smsv["small_zone_percentage"] / 100)
            if income_group in aggregated_income_distribution:
                aggregated_income_distribution[income_group] += proportion
            else:
                aggregated_income_distribution[income_group] = proportion

        # Calculate density score
        density_contribution = smsv_info["density"].iloc[0] * w_density * smsv["small_zone_percentage"] * 200

        # Calculate age score
        age_contribution = get_age_score(age_dist) * w_age * smsv["small_zone_percentage"]

        # Calculate income score
        income_contribution = get_income_score(income_dist) * w_income * smsv["small_zone_percentage"]

        # Total contribution for this small area
        area_score = density_contribution + age_contribution + income_contribution
        total_score += area_score
        
        # Total age score
        total_age_score += age_contribution
        # Total income score
        total_income_score += income_contribution
        # Total density score 
        total_density_score += density_contribution

        # Store contribution data for this small area
        small_area_contributions[smsv["id"]] = {
            "density_score": density_contribution,
            "age_score": age_contribution,
            "income_score": income_contribution,
            "total_score": area_score,
        }

    # # Calculate age score
    # age_score = get_age_score(aggregated_age_distribution) * w_age
    # total_score += age_score

    # # Calculate income score
    # income_score = get_income_score(aggregated_income_distribution) * w_income
    # total_score += income_score

    return {"total_score": total_score, 
            "income_score": total_income_score, 
            "age_score": total_age_score, 
            "density_score": total_density_score, 
            "age_data": aggregated_age_distribution, 
            "income_data": aggregated_income_distribution,
            "small_area_contributions": small_area_contributions,
    }

def get_age_score(age_distribution):
    """
    Calculate a score based on age distribution.

    Parameters:
    ----------
    age_distribution : dict
        A dictionary with age brackets as keys and population as values.

    Returns:
    -------
    float
        Normalized age-based score.
    """
    # Define weights for age groups
    age_weights = {
    "0-4 ára": 0.3,  # Very young children unlikely to use public transport independently
    "5-9 ára": 0.7,  # Primary school pupils
    "10-14 ára": 1.0,  # Secondary school pupils more likely to use public transport
    "15-19 ára": 1.5,  # Teenagers, often students or apprentices, heavy reliance on buses
    "20-24 ára": 1.5,  # Students, apprentices, or young professionals
    "25-29 ára": 1.2,  # Young professionals, still a common demographic for bus users
    "30-34 ára": 1.0,  # Starting to decline as personal vehicle ownership increases
    "35-39 ára": 0.9,
    "40-44 ára": 0.8,
    "45-49 ára": 0.7,
    "50-54 ára": 0.6,
    "55-59 ára": 0.6,
    "60-64 ára": 0.8,  # Approaching retirement, may rely more on buses
    "65-69 ára": 1.2,  # Pensioners starting to rely on public transport
    "70-74 ára": 1.3,  # Active pensioners, heavy bus users
    "75-79 ára": 1.3,
    "80-84 ára": 1.1,  # Decline due to physical limitations
    "85-89 ára": 0.6,  # Very old, fewer likely to use buses
    "90 ára og eldri": 0.2,  # Most unlikely to use public transport independently
}

    # Calculate the weighted sum of the age distribution
    weighted_sum = sum(age_distribution.get(age, 0) * weight for age, weight in age_weights.items())

    # Normalize the score by the total population
    total_population = sum(age_distribution.values())
    if total_population == 0:
        return 0

    return weighted_sum / total_population

def get_income_score(income_distribution):
    """
    Calculate a score based on income distribution.

    Parameters:
    ----------
    income_distribution : dict
        A dictionary with income classes as keys and population as values.

    Returns:
    -------
    float
        Normalized income-based score.
    """
    # Define weights for income classes (1 = highest income, 10 = lowest)
    income_weights = {
        1: 0.5,  # Higher-income groups less dependent on public transport
        2: 0.6,
        3: 0.7,
        4: 0.8,
        5: 1.0,
        6: 1.1,
        7: 1.2,
        8: 1.3,
        9: 1.4,
        10: 1.5,  # Lower-income groups more dependent on public transport
    }

    # Calculate the weighted sum of the income distribution
    weighted_sum = sum(income_distribution.get(income_class, 0) * weight for income_class, weight in income_weights.items())

    # Normalize the score by the total population
    total_population = sum(income_distribution.values())
    if total_population == 0:
        return 0

    return weighted_sum / total_population


from typing import List, Tuple, Dict
from concurrent.futures import ThreadPoolExecutor, as_completed
import math

def calculate_distance(coord1: Tuple[float, float], coord2: Tuple[float, float]) -> float:
    """Calculate the Euclidean distance between two coordinates."""
    return math.sqrt((coord1[0] - coord2[0])**2 + (coord1[1] - coord2[1])**2)

def calc_score_line(stations_coordinates: List[Tuple[float, float]], station_scores: Dict[str, List[float]], w_density, w_income, w_age, radius):
    PENALTY_SCALE = 1.0
    individual_total_scores = [station["total_score"] for station in station_scores]
    overlap_factors = [0] * len(stations_coordinates)  # Initialize overlap factors for each station
    total_individual_score = sum(individual_total_scores) / len(individual_total_scores) if individual_total_scores else 0

    def compute_overlap(i: int, coord1: Tuple[float, float]) -> List[float]:
        """Compute the overlap factors for a single station."""
        local_overlap = [0] * len(stations_coordinates)
        for j, coord2 in enumerate(stations_coordinates):
            if i != j:  # Avoid self-comparison
                distance = calculate_distance(coord1, coord2)
                if distance < radius:
                    # Calculate overlap fraction (inverse of distance within the radius)
                    overlap_factor = (radius - distance) / radius  # Normalize overlap to [0, 1]
                    local_overlap[i] += overlap_factor
                    local_overlap[j] += overlap_factor
        return local_overlap

    # Multithreading the computation of overlap factors
    with ThreadPoolExecutor() as executor:
        futures = {executor.submit(compute_overlap, i, coord1): i for i, coord1 in enumerate(stations_coordinates)}
        for future in as_completed(futures):
            i = futures[future]
            local_overlap = future.result()
            overlap_factors = [sum(x) for x in zip(overlap_factors, local_overlap)]

    # Scale down individual scores based on overlap factors
    adjusted_scores = [
        max(0, score * (1 - PENALTY_SCALE * min(1, overlap_factors[i])))  # Cap scaling at 100%
        for i, score in enumerate(individual_total_scores)
    ]

    # Final aggregated score
    final_score = sum(adjusted_scores) / len(adjusted_scores) if adjusted_scores else 0

    # Return detailed results
    result = {
        "individual_scores": individual_total_scores,
        "adjusted_scores": adjusted_scores,
        "overlap_factors": overlap_factors,
        "total_individual_score": total_individual_score,
        "final_score": final_score
    }
    return result