import datetime import pandas as pd from xgboost import XGBRegressor import hopsworks import json from functions import util import os # Set up api_key = os.getenv('HOPSWORKS_API_KEY') project_name = os.getenv('HOPSWORKS_PROJECT') project = hopsworks.login(project=project_name, api_key_value=api_key) fs = project.get_feature_store() secrets = util.secrets_api(project.name) location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value location = json.loads(location_str) country=location['country'] city=location['city'] street=location['street'] AQI_API_KEY = secrets.get_secret("AQI_API_KEY").value location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value location = json.loads(location_str) today = datetime.datetime.now() - datetime.timedelta(0) feature_view = fs.get_feature_view( name='air_quality_fv', version=1, ) ### Retreive model mr = project.get_model_registry() retrieved_model = mr.get_model( name="air_quality_xgboost_model", version=1, ) saved_model_dir = retrieved_model.download() retrieved_xgboost_model = XGBRegressor() retrieved_xgboost_model.load_model(saved_model_dir + "/model.json") ### Retrieve features weather_fg = fs.get_feature_group( name='weather', version=1, ) today_timestamp = pd.to_datetime(today) batch_data = weather_fg.filter(weather_fg.date >= today_timestamp ).read() ### Predict and upload batch_data['predicted_pm25'] = retrieved_xgboost_model.predict( batch_data[['temperature_2m_mean', 'precipitation_sum', 'wind_speed_10m_max', 'wind_direction_10m_dominant']]) batch_data['street'] = street batch_data['city'] = city batch_data['country'] = country # Fill in the number of days before the date on which you made the forecast (base_date) batch_data['days_before_forecast_day'] = range(1, len(batch_data)+1) batch_data = batch_data.sort_values(by=['date']) #batch_data['date'] = batch_data['date'].dt.tz_convert(None).astype('datetime64[ns]') plt = util.plot_air_quality_forecast(city, street, batch_data, file_path="./img/pm25_forecast.png") monitor_fg = fs.get_or_create_feature_group( name='aq_predictions', description='Air Quality prediction monitoring', version=1, primary_key=['city','street','date','days_before_forecast_day'], event_time="date" ) print(f"Batch data: {batch_data}") monitor_fg.insert(batch_data, write_options={"wait_for_job": True}) monitoring_df = monitor_fg.filter(monitor_fg.days_before_forecast_day == 1).read() # Hindcast monitoring air_quality_fg = fs.get_feature_group( name='air_quality', version=1, ) air_quality_df = air_quality_fg.read() outcome_df = air_quality_df[['date', 'pm25']] preds_df = monitoring_df[['date', 'predicted_pm25']] hindcast_df = pd.merge(preds_df, outcome_df, on="date") hindcast_df = hindcast_df.sort_values(by=['date']) if len(hindcast_df) == 0: hindcast_df = util.backfill_predictions_for_monitoring(weather_fg, air_quality_df, monitor_fg, retrieved_xgboost_model) plt = util.plot_air_quality_forecast(city, street, hindcast_df, file_path="./img/pm25_hindcast_1day.png", hindcast=True)