In [1]:
import datetime
import requests
import pandas as pd
import hopsworks
import datetime
from pathlib import Path
from functions import util
import json
import re
import os
import warnings
warnings.filterwarnings("ignore")

AQI_API_KEY = os.getenv('AQI_API_KEY')
api_key = os.getenv('HOPSWORKS_API_KEY')
project_name = os.getenv('HOPSWORKS_PROJECT')
project = hopsworks.login(project=project_name, api_key_value=api_key)
util.purge_project(project)

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1160340
Connected. Call `.close()` to terminate connection gracefully.
Connected. Call `.close()` to terminate connection gracefully.
Deleted air_quality_fv/1
Deleted air_quality/1
Deleted weather/1
Deleted aq_predictions/1
Deleted model air_quality_xgboost_model/1
Connected. Call `.close()` to terminate connection gracefully.
No SENSOR_LOCATION_JSON secret found


In [2]:
country="pakistan"
city = "lahore"
street = "pakistan-lahore-cantonment"
aqicn_url="https://api.waqi.info/feed/A74005"

latitude, longitude = util.get_city_coordinates(city)
today = datetime.date.today()

csv_file="data/lahore.csv"
util.check_file_path(csv_file)

File successfully found at the path: data/lahore.csv


In [3]:
secrets = util.secrets_api(project.name)
try:
 secrets.create_secret("AQI_API_KEY", AQI_API_KEY)
except hopsworks.RestAPIError:
 AQI_API_KEY = secrets.get_secret("AQI_API_KEY").value

Connected. Call `.close()` to terminate connection gracefully.


In [4]:
try:
 aq_today_df = util.get_pm25(aqicn_url, country, city, street, today, AQI_API_KEY)
except hopsworks.RestAPIError:
 print("It looks like the AQI_API_KEY doesn't work for your sensor. Is the API key correct? Is the sensor URL correct?")


In [None]:
aq_today_df.head()

df = pd.read_csv(csv_file, parse_dates=['date'], skipinitialspace=True)

# These commands will succeed if your CSV file didn't have a `median` or `timestamp` column
df = df.rename(columns={"median": "pm25"})
# df = df.rename(columns={"timestamp": "date"})
df['date'] = pd.to_datetime(df['date']).dt.date

df_aq = df[['date', 'pm25']]
df_aq['pm25'] = df_aq['pm25'].astype('float32')
df_aq.info()
df_aq.dropna(inplace=True)
df_aq['country']=country
df_aq['city']=city
df_aq['street']=street
df_aq['url']=aqicn_url
df_aq

df_aq =df_aq.set_index("date")
df_aq['past_air_quality'] = df_aq['pm25'].rolling(3).mean()
df_aq["past_air_quality"] = df_aq["past_air_quality"].fillna(df_aq["past_air_quality"].mean())
df_aq = df_aq.reset_index()
df_aq.date.describe()

earliest_aq_date = pd.Series.min(df_aq['date'])
earliest_aq_date = earliest_aq_date.strftime('%Y-%m-%d')
earliest_aq_date


RangeIndex: 1802 entries, 0 to 1801
Data columns (total 2 columns):
 # Column Non-Null Count Dtype 
--- ------ -------------- ----- 
 0 date 1802 non-null object 
 1 pm25 1802 non-null float32
dtypes: float32(1), object(1)
memory usage: 21.2+ KB


'2019-12-09'

In [None]:
today

datetime.date(2024, 11, 20)

In [None]:
weather_df = util.get_historical_weather(city, earliest_aq_date, str(today - datetime.timedelta(days=1)), latitude, longitude)
# weather_df = util.get_historical_weather(city, earliest_aq_date, "2024-11-05", latitude, longitude)
weather_df.info()

Coordinates 31.59929656982422°N 74.26347351074219°E
Elevation 215.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s

Index: 1807 entries, 0 to 1806
Data columns (total 6 columns):
 # Column Non-Null Count Dtype 
--- ------ -------------- ----- 
 0 date 1807 non-null datetime64[ns]
 1 temperature_2m_mean 1807 non-null float32 
 2 precipitation_sum 1807 non-null float32 
 3 wind_speed_10m_max 1807 non-null float32 
 4 wind_direction_10m_dominant 1807 non-null float32 
 5 city 1807 non-null object 
dtypes: datetime64[ns](1), float32(4), object(1)
memory usage: 70.6+ KB


In [10]:

import great_expectations as ge
aq_expectation_suite = ge.core.ExpectationSuite(
 expectation_suite_name="aq_expectation_suite"
)

aq_expectation_suite.add_expectation(
 ge.core.ExpectationConfiguration(
 expectation_type="expect_column_min_to_be_between",
 kwargs={
 "column":"pm25",
 "min_value":-0.1,
 "max_value":500.0,
 "strict_min":True
 }
 )
)


{"expectation_type": "expect_column_min_to_be_between", "kwargs": {"column": "pm25", "min_value": -0.1, "max_value": 500.0, "strict_min": true}, "meta": {}}

In [11]:
import great_expectations as ge
weather_expectation_suite = ge.core.ExpectationSuite(
 expectation_suite_name="weather_expectation_suite"
)

def expect_greater_than_zero(col):
 weather_expectation_suite.add_expectation(
 ge.core.ExpectationConfiguration(
 expectation_type="expect_column_min_to_be_between",
 kwargs={
 "column":col,
 "min_value":-0.1,
 "max_value":1000.0,
 "strict_min":True
 }
 )
 )
expect_greater_than_zero("precipitation_sum")
expect_greater_than_zero("wind_speed_10m_max")

In [12]:
fs = project.get_feature_store() 

Connected. Call `.close()` to terminate connection gracefully.


In [13]:
dict_obj = {
 "country": country,
 "city": city,
 "street": street,
 "aqicn_url": aqicn_url,
 "latitude": latitude,
 "longitude": longitude
}

# Convert the dictionary to a JSON string
str_dict = json.dumps(dict_obj)

try:
 secrets.create_secret("SENSOR_LOCATION_JSON", str_dict)
except hopsworks.RestAPIError:
 print("SENSOR_LOCATION_JSON already exists. To update, delete the secret in the UI (https://c.app.hopsworks.ai/account/secrets) and re-run this cell.")
 existing_key = secrets.get_secret("SENSOR_LOCATION_JSON").value
 print(f"{existing_key}")


Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets


In [15]:
air_quality_fg = fs.get_or_create_feature_group(
 name='air_quality',
 description='Air Quality characteristics of each day',
 version=1,
 primary_key=['city', 'street', 'date'],
 event_time="date",
 expectation_suite=aq_expectation_suite
)


In [16]:
air_quality_fg.insert(df_aq)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1160340/fs/1151043/fg/1362254
2024-11-21 05:44:54,527 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1160340/fs/1151043/fg/1362254


Uploading Dataframe: 0.00% | | Rows 0/1802 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: air_quality_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1160340/jobs/named/air_quality_1_offline_fg_materialization/executions


(,
 {
 "success": true,
 "results": [
 {
 "success": true,
 "expectation_config": {
 "expectation_type": "expect_column_min_to_be_between",
 "kwargs": {
 "column": "pm25",
 "min_value": -0.1,
 "max_value": 500.0,
 "strict_min": true
 },
 "meta": {
 "expectationId": 686087
 }
 },
 "result": {
 "observed_value": 1.9899998903274536,
 "element_count": 1802,
 "missing_count": null,
 "missing_percent": null
 },
 "meta": {
 "ingestionResult": "INGESTED",
 "validationTime": "2024-11-20T09:44:54.000525Z"
 },
 "exception_info": {
 "raised_exception": false,
 "exception_message": null,
 "exception_traceback": null
 }
 }
 ],
 "evaluation_parameters": {},
 "statistics": {
 "evaluated_expectations": 1,
 "successful_expectations": 1,
 "unsuccessful_expectations": 0,
 "success_percent": 100.0
 },
 "meta": {
 "great_expectations_version": "0.18.12",
 "expectation_suite_name": "aq_expectation_suite",
 "run_id": {
 "run_name": null,
 "run_time": "2024-11-21T05:44:54.526004+08:00"
 },
 "batch_kwargs": {
 

In [17]:
air_quality_fg.update_feature_description("date", "Date of measurement of air quality")
air_quality_fg.update_feature_description("country", "Country where the air quality was measured (sometimes a city in acqcn.org)")
air_quality_fg.update_feature_description("city", "City where the air quality was measured")
air_quality_fg.update_feature_description("street", "Street in the city where the air quality was measured")
air_quality_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")
air_quality_fg.update_feature_description("past_air_quality", "mean air quality of the past 3 days")




In [18]:
weather_fg = fs.get_or_create_feature_group(
 name='weather',
 description='Weather characteristics of each day',
 version=1,
 primary_key=['city', 'date'],
 event_time="date",
 expectation_suite=weather_expectation_suite
) 

weather_fg.insert(weather_df)

weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
weather_fg.update_feature_description("wind_direction_10m_dominant", "Dominant Wind direction over the dayd")




Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1160340/fs/1151043/fg/1362255
2024-11-21 05:56:51,769 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1160340/fs/1151043/fg/1362255


Uploading Dataframe: 0.00% | | Rows 0/1807 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1160340/jobs/named/weather_1_offline_fg_materialization/executions


