import os import datetime import time import requests import pandas as pd import json from geopy.geocoders import Nominatim def convert_date_to_unix(x): """ Convert datetime to unix time in milliseconds. """ dt_obj = datetime.datetime.strptime(str(x), '%Y-%m-%d %H:%M:%S') dt_obj = int(dt_obj.timestamp() * 1000) return dt_obj def get_city_coordinates(city_name: str): """ Takes city name and returns its latitude and longitude (rounded to 2 digits after dot). """ # Initialize Nominatim API (for getting lat and long of the city) geolocator = Nominatim(user_agent="MyApp") city = geolocator.geocode(city_name) latitude = round(city.latitude, 2) longitude = round(city.longitude, 2) return latitude, longitude ##################################### EEA def convert_to_daily(df, pollutant: str): """ Returns DataFrame where pollutant column is resampled to days and rounded. """ res_df = df.copy() # convert dates in 'time' column res_df["date"] = pd.to_datetime(res_df["date"]) # I want data daily, not hourly (mean per each day = 1 datarow per 1 day) res_df = res_df.set_index('date') res_df = res_df[pollutant].resample('1d').mean().reset_index() res_df[pollutant] = res_df[pollutant].fillna(res_df[pollutant].median()) res_df[pollutant] = res_df[pollutant].apply(lambda x: round(x, 0)) return res_df def find_fullest_csv(csv_links: list, year: str): candidates = [link for link in csv_links if str(year) in link] biggest_df = pd.read_csv(candidates[0]) for link in candidates[1:]: _df = pd.read_csv(link) if len(biggest_df) < len(_df): biggest_df = _df return biggest_df def get_air_quality_from_eea(city_name: str, pollutant: str, start_year: str, end_year: str): """ Takes city name, daterange and returns pandas DataFrame with daily air quality data. It parses data by 1-year batches, so please specify years, not dates. (example: "2014", "2022"...) EEA means European Environmental Agency. So it has data for Europe Union countries ONLY. """ start_of_cell = time.time() params = { 'CountryCode': '', 'CityName': city_name, 'Pollutant': pollutant.upper(), 'Year_from': start_year, 'Year_to': end_year, 'Station': '', 'Source': 'All', 'Samplingpoint': '', 'Output': 'TEXT', 'UpdateDate': '', 'TimeCoverage': 'Year' } # observations endpoint base_url = "https://fme.discomap.eea.europa.eu/fmedatastreaming/AirQualityDownload/AQData_Extract.fmw?" try: response = requests.get(base_url, params=params) except ConnectionError: response = requests.get(base_url, params=params) response.encoding = response.apparent_encoding csv_links = response.text.split("\r\n") res_df = pd.DataFrame() target_year = int(start_year) for year in range(int(start_year), int(end_year) + 1): try: # find the fullest, the biggest csv file with observations for this particular year _df = find_fullest_csv(csv_links, year) # append it to res_df res_df = pd.concat([res_df, _df]) except IndexError: print(f"!! Missing data for {year} for {city} city.") pass pollutant = pollutant.lower() if pollutant == "pm2.5": pollutant = "pm2_5" res_df = res_df.rename(columns={ 'DatetimeBegin': 'date', 'Concentration': pollutant }) # cut timezones info res_df['date'] = res_df['date'].apply(lambda x: x[:-6]) # convert dates in 'time' column res_df['date'] = pd.to_datetime(res_df['date']) res_df = convert_to_daily(res_df, pollutant) res_df['city_name'] = city_name res_df = res_df[['city_name', 'date', pollutant.lower()]] end_of_cell = time.time() print(f"Processed {pollutant.upper()} for {city_name} since {start_year} till {end_year}.") print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n") return res_df ##################################### USEPA city_code_dict = {} pollutant_dict = { 'CO': '42101', 'SO2': '42401', 'NO2': '42602', 'O3': '44201', 'PM10': '81102', 'PM2.5': '88101' } def get_city_code(city_name: str): "Encodes city name to be used later for data parsing using USEPA." if city_code_dict: city_full = [i for i in city_code_dict.keys() if city_name in i][0] return city_code_dict[city_full] else: params = { "email": "test@aqs.api", "key": "test" } response = requests.get("https://aqs.epa.gov/data/api/list/cbsas?", params) response_json = response.json() data = response_json["Data"] for item in data: city_code_dict[item['value_represented']] = item['code'] return get_city_code(city_name) def get_air_quality_from_usepa(city_name: str, pollutant: str, start_date: str, end_date: str): """ Takes city name, daterange and returns pandas DataFrame with daily air quality data. USEPA means United States Environmental Protection Agency. So it has data for US ONLY. """ start_of_cell = time.time() res_df = pd.DataFrame() for start_date_, end_date_ in make_date_intervals(start_date, end_date): params = { "email": "test@aqs.api", "key": "test", "param": pollutant_dict[pollutant.upper().replace("_", ".")], # encoded pollutant "bdate": start_date_, "edate": end_date_, "cbsa": get_city_code(city_name) # Core-based statistical area } # observations endpoint base_url = "https://aqs.epa.gov/data/api/dailyData/byCBSA?" response = requests.get(base_url, params=params) response_json = response.json() df_ = pd.DataFrame(response_json["Data"]) pollutant = pollutant.lower() if pollutant == "pm2.5": pollutant = "pm2_5" df_ = df_.rename(columns={ 'date_local': 'date', 'arithmetic_mean': pollutant }) # convert dates in 'date' column df_['date'] = pd.to_datetime(df_['date']) df_['city_name'] = city_name df_ = df_[['city_name', 'date', pollutant]] res_df = pd.concat([res_df, df_]) # there are duplicated rows (several records for the same day and station). get rid of it. res_df = res_df.groupby(['date', 'city_name'], as_index=False)[pollutant].mean() res_df[pollutant] = round(res_df[pollutant], 1) end_of_cell = time.time() print(f"Processed {pollutant.upper()} for {city_name} since {start_date} till {end_date}.") print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n") return res_df def make_date_intervals(start_date, end_date): start_dt = datetime.datetime.strptime(start_date, '%Y-%m-%d') end_dt = datetime.datetime.strptime(end_date, '%Y-%m-%d') date_intervals = [] for year in range(start_dt.year, end_dt.year + 1): year_start = datetime.datetime(year, 1, 1) year_end = datetime.datetime(year, 12, 31) interval_start = max(start_dt, year_start) interval_end = min(end_dt, year_end) if interval_start < interval_end: date_intervals.append((interval_start.strftime('%Y%m%d'), interval_end.strftime('%Y%m%d'))) return date_intervals ##################################### Weather Open Meteo def get_weather_data_from_open_meteo(city_name: str, start_date: str, end_date: str, coordinates: list = None, forecast: bool = False): """ Takes [city name OR coordinates] and returns pandas DataFrame with weather data. Examples of arguments: coordinates=(47.755, -122.2806), start_date="2023-01-01" """ start_of_cell = time.time() if coordinates: latitude, longitude = coordinates else: latitude, longitude = get_city_coordinates(city_name=city_name) params = { 'latitude': latitude, 'longitude': longitude, 'daily': ["temperature_2m_max", "temperature_2m_min", "precipitation_sum", "rain_sum", "snowfall_sum", "precipitation_hours", "windspeed_10m_max", "windgusts_10m_max", "winddirection_10m_dominant"], 'start_date': start_date, 'end_date': end_date, 'timezone': "Europe/London" } if forecast: # historical forecast endpoint base_url = 'https://api.open-meteo.com/v1/forecast' else: # historical observations endpoint base_url = 'https://archive-api.open-meteo.com/v1/archive' try: response = requests.get(base_url, params=params) except ConnectionError: response = requests.get(base_url, params=params) response_json = response.json() res_df = pd.DataFrame(response_json["daily"]) res_df["city_name"] = city_name # rename columns res_df = res_df.rename(columns={ "time": "date", "temperature_2m_max": "temperature_max", "temperature_2m_min": "temperature_min", "windspeed_10m_max": "wind_speed_max", "winddirection_10m_dominant": "wind_direction_dominant", "windgusts_10m_max": "wind_gusts_max" }) # change columns order res_df = res_df[ ['city_name', 'date', 'temperature_max', 'temperature_min', 'precipitation_sum', 'rain_sum', 'snowfall_sum', 'precipitation_hours', 'wind_speed_max', 'wind_gusts_max', 'wind_direction_dominant'] ] # convert dates in 'date' column res_df["date"] = pd.to_datetime(res_df["date"]) end_of_cell = time.time() print(f"Parsed weather for {city_name} since {start_date} till {end_date}.") print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n") return res_df ##################################### Air Quality data from Open Meteo def get_aqi_data_from_open_meteo(city_name: str, start_date: str, end_date: str, coordinates: list = None, pollutant: str = "pm2_5"): """ Takes [city name OR coordinates] and returns pandas DataFrame with AQI data. Examples of arguments: ... coordinates=(47.755, -122.2806), start_date="2023-01-01", pollutant="no2" ... """ start_of_cell = time.time() if coordinates: latitude, longitude = coordinates else: latitude, longitude = get_city_coordinates(city_name=city_name) pollutant = pollutant.lower() if pollutant == "pm2.5": pollutant = "pm2_5" # make it work with both "no2" and "nitrogen_dioxide" passed. if pollutant == "no2": pollutant = "nitrogen_dioxide" params = { 'latitude': latitude, 'longitude': longitude, 'hourly': [pollutant], 'start_date': start_date, 'end_date': end_date, 'timezone': "Europe/London" } # base endpoint base_url = "https://air-quality-api.open-meteo.com/v1/air-quality" try: response = requests.get(base_url, params=params) except ConnectionError: response = requests.get(base_url, params=params) response_json = response.json() res_df = pd.DataFrame(response_json["hourly"]) # convert dates res_df["time"] = pd.to_datetime(res_df["time"]) # resample to days res_df = res_df.groupby(res_df['time'].dt.date).mean(numeric_only=True).reset_index() res_df[pollutant] = round(res_df[pollutant], 1) # rename columns res_df = res_df.rename(columns={ "time": "date" }) res_df["city_name"] = city_name # change columns order res_df = res_df[ ['city_name', 'date', pollutant] ] end_of_cell = time.time() print(f"Processed {pollutant.upper()} for {city_name} since {start_date} till {end_date}.") print(f"Took {round(end_of_cell - start_of_cell, 2)} sec.\n") return res_df