{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected. Call `.close()` to terminate connection gracefully.\n", "\n", "Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1160340\n", "2024-11-21 05:38:56,037 WARNING: using legacy validation callback\n", "Connected. Call `.close()` to terminate connection gracefully.\n", "Connected. Call `.close()` to terminate connection gracefully.\n", "Deleted air_quality_fv/1\n", "Deleted air_quality/1\n", "Deleted weather/1\n", "Deleted aq_predictions/1\n", "Deleted model air_quality_xgboost_model/1\n", "Connected. Call `.close()` to terminate connection gracefully.\n", "No SENSOR_LOCATION_JSON secret found\n" ] } ], "source": [ "import datetime\n", "import requests\n", "import pandas as pd\n", "import hopsworks\n", "import datetime\n", "from pathlib import Path\n", "from functions import util\n", "import json\n", "import re\n", "import os\n", "import warnings\n", "warnings.filterwarnings(\"ignore\")\n", "\n", "AQI_API_KEY = os.getenv('AQI_API_KEY')\n", "api_key = os.getenv('HOPSWORKS_API_KEY')\n", "project_name = os.getenv('HOPSWORKS_PROJECT')\n", "project = hopsworks.login(project=project_name, api_key_value=api_key)\n", "util.purge_project(project)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "File successfully found at the path: data/lahore.csv\n" ] } ], "source": [ "country=\"pakistan\"\n", "city = \"lahore\"\n", "street = \"pakistan-lahore-cantonment\"\n", "aqicn_url=\"https://api.waqi.info/feed/A74005\"\n", "\n", "latitude, longitude = util.get_city_coordinates(city)\n", "today = datetime.date.today()\n", "\n", "csv_file=\"data/lahore.csv\"\n", "util.check_file_path(csv_file)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected. Call `.close()` to terminate connection gracefully.\n" ] } ], "source": [ "secrets = util.secrets_api(project.name)\n", "try:\n", " secrets.create_secret(\"AQI_API_KEY\", AQI_API_KEY)\n", "except hopsworks.RestAPIError:\n", " AQI_API_KEY = secrets.get_secret(\"AQI_API_KEY\").value" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "try:\n", " aq_today_df = util.get_pm25(aqicn_url, country, city, street, today, AQI_API_KEY)\n", "except hopsworks.RestAPIError:\n", " print(\"It looks like the AQI_API_KEY doesn't work for your sensor. Is the API key correct? Is the sensor URL correct?\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "RangeIndex: 1802 entries, 0 to 1801\n", "Data columns (total 2 columns):\n", " # Column Non-Null Count Dtype \n", "--- ------ -------------- ----- \n", " 0 date 1802 non-null object \n", " 1 pm25 1802 non-null float32\n", "dtypes: float32(1), object(1)\n", "memory usage: 21.2+ KB\n" ] }, { "data": { "text/plain": [ "'2019-12-09'" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "aq_today_df.head()\n", "\n", "df = pd.read_csv(csv_file, parse_dates=['date'], skipinitialspace=True)\n", "\n", "# These commands will succeed if your CSV file didn't have a `median` or `timestamp` column\n", "df = df.rename(columns={\"median\": \"pm25\"})\n", "# df = df.rename(columns={\"timestamp\": \"date\"})\n", "df['date'] = pd.to_datetime(df['date']).dt.date\n", "\n", "df_aq = df[['date', 'pm25']]\n", "df_aq['pm25'] = df_aq['pm25'].astype('float32')\n", "df_aq.info()\n", "df_aq.dropna(inplace=True)\n", "df_aq['country']=country\n", "df_aq['city']=city\n", "df_aq['street']=street\n", "df_aq['url']=aqicn_url\n", "df_aq\n", "\n", "df_aq =df_aq.set_index(\"date\")\n", "df_aq['past_air_quality'] = df_aq['pm25'].rolling(3).mean()\n", "df_aq[\"past_air_quality\"] = df_aq[\"past_air_quality\"].fillna(df_aq[\"past_air_quality\"].mean())\n", "df_aq = df_aq.reset_index()\n", "df_aq.date.describe()\n", "\n", "earliest_aq_date = pd.Series.min(df_aq['date'])\n", "earliest_aq_date = earliest_aq_date.strftime('%Y-%m-%d')\n", "earliest_aq_date" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "datetime.date(2024, 11, 20)" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "today" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Coordinates 31.59929656982422°N 74.26347351074219°E\n", "Elevation 215.0 m asl\n", "Timezone None None\n", "Timezone difference to GMT+0 0 s\n", "\n", "Index: 1807 entries, 0 to 1806\n", "Data columns (total 6 columns):\n", " # Column Non-Null Count Dtype \n", "--- ------ -------------- ----- \n", " 0 date 1807 non-null datetime64[ns]\n", " 1 temperature_2m_mean 1807 non-null float32 \n", " 2 precipitation_sum 1807 non-null float32 \n", " 3 wind_speed_10m_max 1807 non-null float32 \n", " 4 wind_direction_10m_dominant 1807 non-null float32 \n", " 5 city 1807 non-null object \n", "dtypes: datetime64[ns](1), float32(4), object(1)\n", "memory usage: 70.6+ KB\n" ] } ], "source": [ "weather_df = util.get_historical_weather(city, earliest_aq_date, str(today - datetime.timedelta(days=1)), latitude, longitude)\n", "# weather_df = util.get_historical_weather(city, earliest_aq_date, \"2024-11-05\", latitude, longitude)\n", "weather_df.info()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{\"expectation_type\": \"expect_column_min_to_be_between\", \"kwargs\": {\"column\": \"pm25\", \"min_value\": -0.1, \"max_value\": 500.0, \"strict_min\": true}, \"meta\": {}}" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "\n", "import great_expectations as ge\n", "aq_expectation_suite = ge.core.ExpectationSuite(\n", " expectation_suite_name=\"aq_expectation_suite\"\n", ")\n", "\n", "aq_expectation_suite.add_expectation(\n", " ge.core.ExpectationConfiguration(\n", " expectation_type=\"expect_column_min_to_be_between\",\n", " kwargs={\n", " \"column\":\"pm25\",\n", " \"min_value\":-0.1,\n", " \"max_value\":500.0,\n", " \"strict_min\":True\n", " }\n", " )\n", ")\n" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "import great_expectations as ge\n", "weather_expectation_suite = ge.core.ExpectationSuite(\n", " expectation_suite_name=\"weather_expectation_suite\"\n", ")\n", "\n", "def expect_greater_than_zero(col):\n", " weather_expectation_suite.add_expectation(\n", " ge.core.ExpectationConfiguration(\n", " expectation_type=\"expect_column_min_to_be_between\",\n", " kwargs={\n", " \"column\":col,\n", " \"min_value\":-0.1,\n", " \"max_value\":1000.0,\n", " \"strict_min\":True\n", " }\n", " )\n", " )\n", "expect_greater_than_zero(\"precipitation_sum\")\n", "expect_greater_than_zero(\"wind_speed_10m_max\")" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Connected. Call `.close()` to terminate connection gracefully.\n" ] } ], "source": [ "fs = project.get_feature_store() " ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets\n" ] } ], "source": [ "dict_obj = {\n", " \"country\": country,\n", " \"city\": city,\n", " \"street\": street,\n", " \"aqicn_url\": aqicn_url,\n", " \"latitude\": latitude,\n", " \"longitude\": longitude\n", "}\n", "\n", "# Convert the dictionary to a JSON string\n", "str_dict = json.dumps(dict_obj)\n", "\n", "try:\n", " secrets.create_secret(\"SENSOR_LOCATION_JSON\", str_dict)\n", "except hopsworks.RestAPIError:\n", " print(\"SENSOR_LOCATION_JSON already exists. To update, delete the secret in the UI (https://c.app.hopsworks.ai/account/secrets) and re-run this cell.\")\n", " existing_key = secrets.get_secret(\"SENSOR_LOCATION_JSON\").value\n", " print(f\"{existing_key}\")\n" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "air_quality_fg = fs.get_or_create_feature_group(\n", " name='air_quality',\n", " description='Air Quality characteristics of each day',\n", " version=1,\n", " primary_key=['city', 'street', 'date'],\n", " event_time=\"date\",\n", " expectation_suite=aq_expectation_suite\n", ")\n" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Feature Group created successfully, explore it at \n", "https://c.app.hopsworks.ai:443/p/1160340/fs/1151043/fg/1362254\n", "2024-11-21 05:44:54,527 INFO: \t1 expectation(s) included in expectation_suite.\n", "Validation succeeded.\n", "Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1160340/fs/1151043/fg/1362254\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "506badbe42224a17b3ccc6d6b1ae7927", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Uploading Dataframe: 0.00% | | Rows 0/1802 | Elapsed Time: 00:00 | Remaining Time: ?" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Launching job: air_quality_1_offline_fg_materialization\n", "Job started successfully, you can follow the progress at \n", "https://c.app.hopsworks.ai/p/1160340/jobs/named/air_quality_1_offline_fg_materialization/executions\n" ] }, { "data": { "text/plain": [ "(,\n", " {\n", " \"success\": true,\n", " \"results\": [\n", " {\n", " \"success\": true,\n", " \"expectation_config\": {\n", " \"expectation_type\": \"expect_column_min_to_be_between\",\n", " \"kwargs\": {\n", " \"column\": \"pm25\",\n", " \"min_value\": -0.1,\n", " \"max_value\": 500.0,\n", " \"strict_min\": true\n", " },\n", " \"meta\": {\n", " \"expectationId\": 686087\n", " }\n", " },\n", " \"result\": {\n", " \"observed_value\": 1.9899998903274536,\n", " \"element_count\": 1802,\n", " \"missing_count\": null,\n", " \"missing_percent\": null\n", " },\n", " \"meta\": {\n", " \"ingestionResult\": \"INGESTED\",\n", " \"validationTime\": \"2024-11-20T09:44:54.000525Z\"\n", " },\n", " \"exception_info\": {\n", " \"raised_exception\": false,\n", " \"exception_message\": null,\n", " \"exception_traceback\": null\n", " }\n", " }\n", " ],\n", " \"evaluation_parameters\": {},\n", " \"statistics\": {\n", " \"evaluated_expectations\": 1,\n", " \"successful_expectations\": 1,\n", " \"unsuccessful_expectations\": 0,\n", " \"success_percent\": 100.0\n", " },\n", " \"meta\": {\n", " \"great_expectations_version\": \"0.18.12\",\n", " \"expectation_suite_name\": \"aq_expectation_suite\",\n", " \"run_id\": {\n", " \"run_name\": null,\n", " \"run_time\": \"2024-11-21T05:44:54.526004+08:00\"\n", " },\n", " \"batch_kwargs\": {\n", " \"ge_batch_id\": \"adcf6d76-a788-11ef-a237-1091d10619ea\"\n", " },\n", " \"batch_markers\": {},\n", " \"batch_parameters\": {},\n", " \"validation_time\": \"20241120T214454.525505Z\",\n", " \"expectation_suite_meta\": {\n", " \"great_expectations_version\": \"0.18.12\"\n", " }\n", " }\n", " })" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "air_quality_fg.insert(df_aq)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "air_quality_fg.update_feature_description(\"date\", \"Date of measurement of air quality\")\n", "air_quality_fg.update_feature_description(\"country\", \"Country where the air quality was measured (sometimes a city in acqcn.org)\")\n", "air_quality_fg.update_feature_description(\"city\", \"City where the air quality was measured\")\n", "air_quality_fg.update_feature_description(\"street\", \"Street in the city where the air quality was measured\")\n", "air_quality_fg.update_feature_description(\"pm25\", \"Particles less than 2.5 micrometers in diameter (fine particles) pose health risk\")\n", "air_quality_fg.update_feature_description(\"past_air_quality\", \"mean air quality of the past 3 days\")\n" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Feature Group created successfully, explore it at \n", "https://c.app.hopsworks.ai:443/p/1160340/fs/1151043/fg/1362255\n", "2024-11-21 05:56:51,769 INFO: \t2 expectation(s) included in expectation_suite.\n", "Validation succeeded.\n", "Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1160340/fs/1151043/fg/1362255\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "455439f2dd8643b4b06da1d3851d2f8c", "version_major": 2, "version_minor": 0 }, "text/plain": [ "Uploading Dataframe: 0.00% | | Rows 0/1807 | Elapsed Time: 00:00 | Remaining Time: ?" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "Launching job: weather_1_offline_fg_materialization\n", "Job started successfully, you can follow the progress at \n", "https://c.app.hopsworks.ai/p/1160340/jobs/named/weather_1_offline_fg_materialization/executions\n" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "weather_fg = fs.get_or_create_feature_group(\n", " name='weather',\n", " description='Weather characteristics of each day',\n", " version=1,\n", " primary_key=['city', 'date'],\n", " event_time=\"date\",\n", " expectation_suite=weather_expectation_suite\n", ") \n", "\n", "weather_fg.insert(weather_df)\n", "\n", "weather_fg.update_feature_description(\"date\", \"Date of measurement of weather\")\n", "weather_fg.update_feature_description(\"city\", \"City where weather is measured/forecast for\")\n", "weather_fg.update_feature_description(\"temperature_2m_mean\", \"Temperature in Celsius\")\n", "weather_fg.update_feature_description(\"precipitation_sum\", \"Precipitation (rain/snow) in mm\")\n", "weather_fg.update_feature_description(\"wind_speed_10m_max\", \"Wind speed at 10m abouve ground\")\n", "weather_fg.update_feature_description(\"wind_direction_10m_dominant\", \"Dominant Wind direction over the dayd\")\n", "\n", "\n" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.4" } }, "nbformat": 4, "nbformat_minor": 2 }