Spaces:
Running
Running
import pandas as pd | |
import os | |
import logging | |
import numpy as np | |
import ast | |
import math | |
from pathlib import Path | |
# define logger | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
handlers=[ | |
logging.FileHandler("process_data.log"), | |
logging.StreamHandler(), | |
], | |
) | |
CITIES_DATA = os.path.join("data", "raw", "2024_06_24_cities_0624_v5.csv") | |
DATA_ENRICHED = os.path.join("data", "cities_enriched.csv") | |
# meta data for kreis codes ( variable in coordinates table) | |
NAME_CODE_DATA = os.path.join("data", "raw", "name_kreiscode.csv") | |
CODES_KOMMUNEN = os.path.join("data", "raw", "Deutschlandatlas.csv") | |
# coordinates for Gemeinden | |
COORDINATES = os.path.join("data", "raw", "coordinates_plz_kreiscode.csv") | |
if not os.path.exists(os.path.join("data", "preprocessed")): | |
Path(os.path.join("data", "preprocessed")).mkdir(parents=True, exist_ok=True) | |
def load_cities(path: str) -> pd.DataFrame: | |
df = pd.read_csv(path) | |
df.drop(columns=["name"], inplace=True) | |
df.drop_duplicates(inplace=True) | |
return df | |
def create_code_mapper(path: str) -> dict: | |
name_code = pd.read_csv( | |
path, sep=";", encoding="latin_1", names=["Datum", "Code", "Name", "Fläche"] | |
)[7:13929] | |
# adds all Landkreise and gemeinden to the mapper | |
code_mapper = { | |
(key if type(key) != float else "0000"): value | |
for key, value in zip(name_code["Name"], name_code["Code"]) | |
} | |
# adds all gemeindeverbände to the mapper | |
kommunen_code = pd.read_csv(CODES_KOMMUNEN, sep=";", encoding="latin_1") | |
code_mapper_update = { | |
key: value | |
for key, value in zip(kommunen_code["name"], kommunen_code["Gebietskennziffer"]) | |
} | |
print(code_mapper_update) | |
code_mapper.update(code_mapper_update) | |
return code_mapper | |
def map_code(org_name, code_mapper): | |
# Split the org_name string into parts | |
parts = org_name.split() | |
# print(parts, type(parts[0])) | |
# Find a key in code_mapper that contains all parts of the split org_name | |
for key in code_mapper.keys(): | |
# look first for whole name (cases like "Landkreis München" , "kreisfreie Stadt München") | |
if all(part in key for part in parts): | |
return code_mapper[key] | |
elif any(part in key for part in parts): | |
return code_mapper[key] | |
# Return None or a default value if no key matches all parts | |
return None | |
# main goal with this: identify Landkreise and their codes | |
def add_code(df: pd.DataFrame, code_mapper: dict) -> pd.DataFrame: | |
"""Add the (Kreis-/Gemeinde-)code to the dataframe based on the name of the (administrative) region.""" | |
df["Code"] = df["Kommune"].apply(lambda x: map_code(x, code_mapper)) | |
df["Code"] = df["Code"].apply(lambda x: int(x) if x is not None else None) | |
return df | |
def org_in_plzname(org_name, plz_name): | |
parts = org_name.split() | |
if any(part in plz_name for part in parts): | |
return True | |
else: | |
return False | |
def load_coordinates(path: str) -> pd.DataFrame: | |
return pd.read_csv(path, sep=";") | |
# maybe 2d coordinates instead of geometry | |
def merge_coordinates(df: pd.DataFrame, coordinates: pd.DataFrame) -> pd.DataFrame: | |
"""Merge the coordinates of the regions to the dataframe. Try to use | |
Kreiscode first, if it consists of 5 digits. Else, use the name of | |
the region. | |
""" | |
geometries = [] | |
for row in df.itertuples(): | |
# adds coordinates for Landkreise | |
if pd.notna(row.Code) and ( | |
len(str(int(row.Code))) == 5 or len(str(int(row.Code))) == 4 | |
): | |
coor = coordinates[coordinates["Kreis code"] == row.Code] | |
geometry = [co.geo_point_2d for co in coor.itertuples()] | |
geometries.append(geometry) | |
else: | |
coor = coordinates[ | |
coordinates["PLZ Name (short)"].apply( | |
lambda x: org_in_plzname(row.Kommune, x) | |
) | |
] | |
# adds coordinates for Gemeindenamen in coordinates table | |
if len(coor) > 0: | |
geometry = [co.geo_point_2d for co in coor.itertuples()] | |
geometries.append(geometry) | |
# adds coordinates from infered kreis code if Gebietskennziffer available | |
elif row.Code and pd.notna(row.Code): # and not math.isnan(row.Code): | |
if len(str(int(row.Code))) < 4: | |
code_str = str(int(row.Code)) | |
coor = coordinates[ | |
coordinates["Kreis code"] | |
.astype(str) | |
.apply(lambda x: x[: len(code_str)]) | |
== code_str | |
] | |
geometry = [co.geo_point_2d for co in coor.itertuples()] | |
geometries.append(geometry) | |
elif str(row.Code)[:2] in ["11", "12", "13", "14", "15", "16"]: | |
coor = coordinates[ | |
coordinates["Kreis code"] == int(str(row.Code)[:5]) | |
] | |
else: | |
coor = coordinates[ | |
coordinates["Kreis code"] == int(str(row.Code)[:4]) | |
] | |
geometry = [co.geo_point_2d for co in coor.itertuples()] | |
geometries.append(geometry) | |
else: | |
geometries.append([]) | |
df["Geometry"] = geometries | |
return df | |
def aggregate_coordinates(geo_element: str) -> list: | |
# Convert the string representation of a list into an actual list | |
if geo_element == "[]" or geo_element == []: | |
return [] | |
else: | |
actual_list = geo_element # ast.literal_eval(geo_element) | |
processed_list = [list(map(float, coord.split(", "))) for coord in actual_list] | |
# print(processed_list) | |
if len(processed_list) > 1: | |
coordinates = np.mean(np.array(processed_list), axis=0) | |
else: | |
coordinates = np.array(processed_list[0]) | |
return coordinates | |
if __name__ == "__main__": | |
code_mapper = create_code_mapper(NAME_CODE_DATA) | |
logging.info("Code mapper created") | |
cities = load_cities(CITIES_DATA) | |
data = add_code(cities, code_mapper) | |
missing = data[data["Code"].isnull()] | |
logging.info(f"Missing values Gebietscode: {len(missing)}") | |
data.to_csv( | |
os.path.join("data", "preprocessed", "cities_enriched_with_code.csv"), | |
index=False, | |
) | |
# data = pd.read_csv( | |
# os.path.join("data", "preprocessed", "cities_enriched_with_code.csv")) | |
data["Code"] = data["Code"].apply(lambda x: int(x) if pd.notna(x) else None) | |
coordinates = load_coordinates(COORDINATES) | |
data = merge_coordinates(data, coordinates) | |
data.to_csv( | |
os.path.join("data", "preprocessed", "cities_enriched_with_coordinates.csv"), | |
index=False, | |
) | |
logging.info("Coordinates merged") | |
missing = data[ | |
[ | |
all([x, y]) | |
for x, y in zip( | |
data["Geometry"].apply(lambda x: x == []), data["Code"].isnull() | |
) | |
] | |
] | |
missing_geometry = data[data["Geometry"].apply(lambda x: x == [])] | |
logging.info(f"Missing values: {len(missing)}") | |
logging.info(f"Missing geometry: {len(missing_geometry)}") | |
missing_geometry.to_csv(os.path.join("data", "missing_values.csv"), index=False) | |
# data = pd.read_csv(os.path.join("data", "cities_enriched_manually.csv")) | |
data["Geometry"] = data["Geometry"].apply(aggregate_coordinates) | |
data.to_csv(DATA_ENRICHED, index=False) | |