import os
import datetime
import time
import requests
import pandas as pd
import json
from geopy.geocoders import Nominatim
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
from matplotlib.ticker import MultipleLocator
import openmeteo_requests
import requests_cache
from retry_requests import retry
import hopsworks
import hsfs
from pathlib import Path
def get_historical_weather(city, start_date, end_date, latitude, longitude):
# latitude, longitude = get_city_coordinates(city)
# Setup the Open-Meteo API client with cache and retry on error
cache_session = requests_cache.CachedSession('.cache', expire_after = -1)
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
openmeteo = openmeteo_requests.Client(session = retry_session)
# Make sure all required weather variables are listed here
# The order of variables in hourly or daily is important to assign them correctly below
url = ""
params = {
"latitude": latitude,
"longitude": longitude,
"start_date": start_date,
"end_date": end_date,
"daily": ["temperature_2m_mean", "precipitation_sum", "wind_speed_10m_max", "wind_direction_10m_dominant"]
responses = openmeteo.weather_api(url, params=params)
# Process first location. Add a for-loop for multiple locations or weather models
response = responses[0]
print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E")
print(f"Elevation {response.Elevation()} m asl")
print(f"Timezone {response.Timezone()} {response.TimezoneAbbreviation()}")
print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s")
# Process daily data. The order of variables needs to be the same as requested.
daily = response.Daily()
daily_temperature_2m_mean = daily.Variables(0).ValuesAsNumpy()
daily_precipitation_sum = daily.Variables(1).ValuesAsNumpy()
daily_wind_speed_10m_max = daily.Variables(2).ValuesAsNumpy()
daily_wind_direction_10m_dominant = daily.Variables(3).ValuesAsNumpy()
daily_data = {"date": pd.date_range(
start = pd.to_datetime(daily.Time(), unit = "s"),
end = pd.to_datetime(daily.TimeEnd(), unit = "s"),
freq = pd.Timedelta(seconds = daily.Interval()),
inclusive = "left"
daily_data["temperature_2m_mean"] = daily_temperature_2m_mean
daily_data["precipitation_sum"] = daily_precipitation_sum
daily_data["wind_speed_10m_max"] = daily_wind_speed_10m_max
daily_data["wind_direction_10m_dominant"] = daily_wind_direction_10m_dominant
daily_dataframe = pd.DataFrame(data = daily_data)
daily_dataframe = daily_dataframe.dropna()
daily_dataframe['city'] = city
return daily_dataframe
def get_hourly_weather_forecast(city, latitude, longitude):
# latitude, longitude = get_city_coordinates(city)
# Setup the Open-Meteo API client with cache and retry on error
cache_session = requests_cache.CachedSession('.cache', expire_after = 3600)
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
openmeteo = openmeteo_requests.Client(session = retry_session)
# Make sure all required weather variables are listed here
# The order of variables in hourly or daily is important to assign them correctly below
url = ""
params = {
"latitude": latitude,
"longitude": longitude,
"hourly": ["temperature_2m", "precipitation", "wind_speed_10m", "wind_direction_10m"]
responses = openmeteo.weather_api(url, params=params)
# Process first location. Add a for-loop for multiple locations or weather models
response = responses[0]
print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E")
print(f"Elevation {response.Elevation()} m asl")
print(f"Timezone {response.Timezone()} {response.TimezoneAbbreviation()}")
print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s")
# Process hourly data. The order of variables needs to be the same as requested.
hourly = response.Hourly()
hourly_temperature_2m = hourly.Variables(0).ValuesAsNumpy()
hourly_precipitation = hourly.Variables(1).ValuesAsNumpy()
hourly_wind_speed_10m = hourly.Variables(2).ValuesAsNumpy()
hourly_wind_direction_10m = hourly.Variables(3).ValuesAsNumpy()
hourly_data = {"date": pd.date_range(
start = pd.to_datetime(hourly.Time(), unit = "s"),
end = pd.to_datetime(hourly.TimeEnd(), unit = "s"),
freq = pd.Timedelta(seconds = hourly.Interval()),
inclusive = "left"
hourly_data["temperature_2m_mean"] = hourly_temperature_2m
hourly_data["precipitation_sum"] = hourly_precipitation
hourly_data["wind_speed_10m_max"] = hourly_wind_speed_10m
hourly_data["wind_direction_10m_dominant"] = hourly_wind_direction_10m
hourly_dataframe = pd.DataFrame(data = hourly_data)
hourly_dataframe = hourly_dataframe.dropna()
return hourly_dataframe
def get_city_coordinates(city_name: str):
Takes city name and returns its latitude and longitude (rounded to 2 digits after dot).
# Initialize Nominatim API (for getting lat and long of the city)
geolocator = Nominatim(user_agent="MyApp")
city = geolocator.geocode(city_name)
latitude = round(city.latitude, 2)
longitude = round(city.longitude, 2)
return latitude, longitude
def trigger_request(url:str):
response = requests.get(url)
if response.status_code == 200:
# Extract the JSON content from the response
data = response.json()
print("Failed to retrieve data. Status Code:", response.status_code)
raise requests.exceptions.RequestException(response.status_code)
return data
def get_pm25(aqicn_url: str, country: str, city: str, street: str, day:, AQI_API_KEY: str):
Returns DataFrame with air quality (pm25) as dataframe
# The API endpoint URL
url = f"{aqicn_url}/?token={AQI_API_KEY}"
# Make a GET request to fetch the data from the API
data = trigger_request(url)
# if we get 'Unknown station' response then retry with city in url
if data['data'] == "Unknown station":
url1 = f"{country}/{street}/?token={AQI_API_KEY}"
data = trigger_request(url1)
if data['data'] == "Unknown station":
url2 = f"{country}/{city}/{street}/?token={AQI_API_KEY}"
data = trigger_request(url2)
# Check if the API response contains the data
if data['status'] == 'ok':
# Extract the air quality data
aqi_data = data['data']
aq_today_df = pd.DataFrame()
aq_today_df['pm25'] = [aqi_data['iaqi'].get('pm25', {}).get('v', None)]
aq_today_df['pm25'] = aq_today_df['pm25'].astype('float32')
aq_today_df['country'] = country
aq_today_df['city'] = city
aq_today_df['street'] = street
aq_today_df['date'] = day
aq_today_df['date'] = pd.to_datetime(aq_today_df['date'])
aq_today_df['url'] = aqicn_url
print("Error: There may be an incorrect URL for your Sensor or it is not contactable right now. The API response does not contain data. Error message:", data['data'])
raise requests.exceptions.RequestException(data['data'])
return aq_today_df
def plot_air_quality_forecast(city: str, street: str, df: pd.DataFrame, file_path: str, hindcast=False):
fig, ax = plt.subplots(figsize=(10, 6))
day = pd.to_datetime(df['date'])
# Plot each column separately in matplotlib
ax.plot(day, df['predicted_pm25'], label='Predicted PM2.5', color='red', linewidth=2, marker='o', markersize=5, markerfacecolor='blue')
# Set the y-axis to a logarithmic scale
ax.set_yticks([0, 10, 25, 50, 100, 250, 500])
# Set the labels and title
ax.set_title(f"PM2.5 Predicted (Logarithmic Scale) for {city}, {street}")
colors = ['green', 'yellow', 'orange', 'red', 'purple', 'darkred']
labels = ['Good', 'Moderate', 'Unhealthy for Some', 'Unhealthy', 'Very Unhealthy', 'Hazardous']
ranges = [(0, 49), (50, 99), (100, 149), (150, 199), (200, 299), (300, 500)]
for color, (start, end) in zip(colors, ranges):
ax.axhspan(start, end, color=color, alpha=0.3)
# Add a legend for the different Air Quality Categories
patches = [Patch(color=colors[i], label=f"{labels[i]}: {ranges[i][0]}-{ranges[i][1]}") for i in range(len(colors))]
legend1 = ax.legend(handles=patches, loc='upper right', title="Air Quality Categories", fontsize='x-small')
# Aim for ~10 annotated values on x-axis, will work for both forecasts ans hindcasts
if len(df.index) > 11:
every_x_tick = len(df.index) / 10
if hindcast == True:
ax.plot(day, df['pm25'], label='Actual PM2.5', color='black', linewidth=2, marker='^', markersize=5, markerfacecolor='grey')
legend2 = ax.legend(loc='upper left', fontsize='x-small')
# Ensure everything is laid out neatly
# # Save the figure, overwriting any existing file with the same name
return plt
def delete_feature_groups(fs, name):
for fg in fs.get_feature_groups(name):
print(f"Deleted {}/{fg.version}")
except hsfs.client.exceptions.RestAPIError:
print(f"No {name} feature group found")
def delete_feature_views(fs, name):
for fv in fs.get_feature_views(name):
print(f"Deleted {}/{fv.version}")
except hsfs.client.exceptions.RestAPIError:
print(f"No {name} feature view found")
def delete_models(mr, name):
models = mr.get_models(name)
if not models:
print(f"No {name} model found")
for model in models:
print(f"Deleted model {}/{model.version}")
def delete_secrets(proj, name):
secrets = secrets_api(
secret = secrets.get_secret(name)
print(f"Deleted secret {name}")
except hopsworks.client.exceptions.RestAPIError:
print(f"No {name} secret found")
# WARNING - this will wipe out all your feature data and models
def purge_project(proj):
fs = proj.get_feature_store()
mr = proj.get_model_registry()
# Delete Feature Views before deleting the feature groups
delete_feature_views(fs, "air_quality_fv")
# Delete ALL Feature Groups
delete_feature_groups(fs, "air_quality")
delete_feature_groups(fs, "weather")
delete_feature_groups(fs, "aq_predictions")
# Delete all Models
delete_models(mr, "air_quality_xgboost_model")
delete_secrets(proj, "SENSOR_LOCATION_JSON")
def secrets_api(proj):
host = ""
api_key = os.environ.get('HOPSWORKS_API_KEY')
conn = hopsworks.connection(host=host, project=proj, api_key_value=api_key)
return conn.get_secrets_api()
def check_file_path(file_path):
my_file = Path(file_path)
if my_file.is_file() == False:
print(f"Error. File not found at the path: {file_path} ")
print(f"File successfully found at the path: {file_path}")
def backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, model):
features_df =
features_df = features_df.sort_values(by=['date'], ascending=True)
features_df = features_df.tail(10)
features_df['predicted_pm25'] = model.predict(features_df[['temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max', 'wind_direction_10m_dominant']])
air_quality_df['date'] = pd.to_datetime(air_quality_df['date'])
features_df['date'] = features_df['date'].dt.tz_convert(None).astype('datetime64[ns]')
df = pd.merge(features_df, air_quality_df[['date','pm25','street','country']], on="date")
df['days_before_forecast_day'] = 1
hindcast_df = df
df = df.drop('pm25', axis=1)
monitor_fg.insert(df, write_options={"wait_for_job": True})
return hindcast_df