# <span style="font-width:bold; font-size: 3rem; color:#1EB182;"><img src="../../images/icon102.png" width="38px"></img> **Hopsworks Feature Store** </span>

<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 01: Backfill Features to the Feature Store</span>

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/logicalclocks/hopsworks-tutorials/blob/master/advanced_tutorials/air_quality/1_backfill_feature_groups.ipynb)


## üóíÔ∏è This notebook is divided into the following sections:
1. Fetch historical data
2. Connect to the Hopsworks feature store
3. Create feature groups and insert them to the feature store

![tutorial-flow](../../images/01_featuregroups.png)

### <span style='color:#ff5f27'> üìù Imports

In [2]:
%pip install geopy folium streamlit-folium geopy --q

In [3]:
import datetime
import time
import requests
from urllib.request import urlopen
import json
import pandas as pd
import folium
from functions import *
import warnings
warnings.filterwarnings("ignore")

---

## <span style='color:#ff5f27'> üåç Representing the Target cities </span>

In [4]:
target_url='https://repo.hops.works/dev/jdowling/target_cities.json'
response = urlopen(target_url)
target_cities = json.loads(response.read())


## <span style='color:#ff5f27'> üå´ Processing Air Quality data</span>

### [üá™üá∫ EEA](https://discomap.eea.europa.eu/map/fme/AirQualityExport.htm)
#### EEA means European Environmental Agency

In [5]:
target_cities["EU"]

{'Amsterdam': [52.37, 4.89],
 'Athina': [37.98, 23.73],
 'Berlin': [52.52, 13.39],
 'Gdansk': [54.37, 18.61],
 'Krak√≥w': [50.06, 19.94],
 'London': [51.51, -0.13],
 'Madrid': [40.42, -3.7],
 'Marseille': [43.3, 5.37],
 'Milano': [45.46, 9.19],
 'M√ºnchen': [48.14, 11.58],
 'Napoli': [40.84, 14.25],
 'Paris': [48.85, 2.35],
 'Sevilla': [37.39, -6.0],
 'Stockholm': [59.33, 18.07],
 'Tallinn': [59.44, 24.75],
 'Varna': [43.21, 27.92],
 'Wien': [48.21, 16.37]}

In [6]:
df_eu = pd.read_csv("data/backfill_pm2_5_eu.csv")

In [7]:
df_eu.isna().sum().sum()

0

In [8]:
print("Size of this dataframe:", df_eu.shape)

df_eu.sample(3)

Size of this dataframe: (63548, 3)


Unnamed: 0,city_name,date,pm2_5
16477,Krak√≥w,2017-01-05,16.0
12612,Gdansk,2016-09-15,10.0
58456,Varna,2018-12-03,11.0


### [üá∫üá∏ USEPA](https://aqs.epa.gov/aqsweb/documents/data_api.html#daily)
#### USEPA means United States Environmental Protection Agency
[Manual downloading](https://www.epa.gov/outdoor-air-quality-data/download-daily-data)



In [9]:
target_cities["US"]

{'Albuquerque': [35.08, -106.65],
 'Atlanta': [33.75, -84.39],
 'Chicago': [41.88, -87.62],
 'Columbus': [39.96, -83.0],
 'Dallas': [32.78, -96.8],
 'Denver': [39.74, -104.98],
 'Houston': [29.76, -95.37],
 'Los Angeles': [34.05, -118.24],
 'New York': [40.71, -74.01],
 'Phoenix-Mesa': [33.66, -112.04],
 'Salt Lake City': [40.76, -111.89],
 'San Francisco': [37.78, -122.42],
 'Tampa': [27.95, -82.46]}

In [10]:
df_us = pd.read_csv("data/backfill_pm2_5_us.csv")

In [11]:
df_us.isna().sum().sum()

0

In [12]:
print("Size of this dataframe:", df_us.shape)

df_us.sample(3)

Size of this dataframe: (46037, 3)


Unnamed: 0,date,city_name,pm2_5
39995,2016-05-09,San Francisco,7.3
18276,2016-04-10,Denver,3.1
32122,2014-10-17,Phoenix-Mesa,11.7


### <span style="color:#ff5f27;">üè¢ Processing special city - `Seattle`</span>
#### We need different stations across the Seattle. 
I downloaded daily `PM2.5` data manually [here](https://www.epa.gov/outdoor-air-quality-data/download-daily-data)

In [13]:
target_cities["Seattle"]

{'Bellevue-SE 12th St': [47.60086, -122.1484],
 'DARRINGTON - FIR ST (Darrington High School)': [48.2469, -121.6031],
 'KENT - JAMES & CENTRAL': [47.38611, -122.23028],
 'LAKE FOREST PARK TOWNE CENTER': [47.755, -122.2806],
 'MARYSVILLE - 7TH AVE (Marysville Junior High)': [48.05432, -122.17153],
 'NORTH BEND - NORTH BEND WAY': [47.49022, -121.77278],
 'SEATTLE - BEACON HILL': [47.56824, -122.30863],
 'SEATTLE - DUWAMISH': [47.55975, -122.33827],
 'SEATTLE - SOUTH PARK #2': [47.53091, -122.3208],
 'Seattle-10th & Weller': [47.59722, -122.31972],
 'TACOMA - ALEXANDER AVE': [47.2656, -122.3858],
 'TACOMA - L STREET': [47.1864, -122.4517],
 'Tacoma-S 36th St': [47.22634, -122.46256],
 'Tukwila Allentown': [47.49854, -122.27839],
 'Tulalip-Totem Beach Rd': [48.06534, -122.28519]}

In [14]:
df_seattle = pd.read_csv("data/backfill_pm2_5_seattle.csv")

In [15]:
df_seattle.isna().sum().sum()

0

In [16]:
print("Size of this dataframe:", df_seattle.shape)

df_seattle.sample(3)

Size of this dataframe: (46479, 3)


Unnamed: 0,city_name,date,pm2_5
3345,MARYSVILLE - 7TH AVE (Marysville Junior High),2013-05-03,5.3
22979,TACOMA - L STREET,2018-08-13,19.2
14456,DARRINGTON - FIR ST (Darrington High School),2016-11-09,8.4


In [17]:
df_seattle.city_name.value_counts()

city_name
NORTH BEND - NORTH BEND WAY                      3705
TACOMA - L STREET                                3696
SEATTLE - BEACON HILL                            3691
MARYSVILLE - 7TH AVE (Marysville Junior High)    3648
DARRINGTON - FIR ST (Darrington High School)     3614
SEATTLE - SOUTH PARK #2                          3577
TACOMA - ALEXANDER AVE                           3569
KENT - JAMES & CENTRAL                           3556
SEATTLE - DUWAMISH                               3439
Seattle-10th & Weller                            3097
LAKE FOREST PARK TOWNE CENTER                    2999
Tacoma-S 36th St                                 2574
Bellevue-SE 12th St                              2172
Tukwila Allentown                                2074
Tulalip-Totem Beach Rd                           1068
Name: count, dtype: int64

### <span style="color:#ff5f27;">üåü All together</span>

In [18]:
df_air_quality = pd.concat([df_eu, df_us, df_seattle]).reset_index(drop=True)

In [19]:
df_air_quality.sample(5)

Unnamed: 0,city_name,date,pm2_5
155596,Tacoma-S 36th St,2023-03-12,13.9
72851,Chicago,2018-07-04,10.3
150716,Bellevue-SE 12th St,2022-12-07,1.8
88999,Los Angeles,2016-07-11,10.5
127366,Tacoma-S 36th St,2017-12-01,4.6


In [20]:
df_air_quality.shape

(156064, 3)

In [21]:
df_air_quality.columns

Index(['city_name', 'date', 'pm2_5'], dtype='object')

---

## <span style='color:#ff5f27'> üå¶ Loading Weather Data from [Open Meteo](https://open-meteo.com/en/docs)

In [22]:
df_weather = pd.read_csv("data/backfill_weather.csv")

In [23]:
df_weather.city_name.value_counts()

city_name
Amsterdam                                        3767
Athina                                           3767
Berlin                                           3767
Gdansk                                           3767
Krak√≥w                                           3767
London                                           3767
Madrid                                           3767
Marseille                                        3767
Milano                                           3767
M√ºnchen                                          3767
Napoli                                           3767
Paris                                            3767
Sevilla                                          3767
Stockholm                                        3767
Tallinn                                          3767
Varna                                            3767
Wien                                             3767
Albuquerque                                      3767
Atlanta         

In [24]:
df_weather.sample(3)

Unnamed: 0,city_name,date,temperature_max,temperature_min,precipitation_sum,rain_sum,snowfall_sum,precipitation_hours,wind_speed_max,wind_gusts_max,wind_direction_dominant
56824,Varna,2014-03-01,9.4,5.5,2.6,2.6,0.0,7.0,13.2,22.7,150
146508,SEATTLE - SOUTH PARK #2,2022-12-08,5.6,1.8,7.9,7.6,0.21,15.0,18.1,38.9,285
53035,Tallinn,2014-01-31,-8.6,-17.0,1.0,0.0,0.98,3.0,29.6,55.8,158


---

In [25]:
df_air_quality.date = pd.to_datetime(df_air_quality.date)
df_weather.date = pd.to_datetime(df_weather.date)

df_air_quality["unix_time"] = df_air_quality["date"].apply(convert_date_to_unix)
df_weather["unix_time"] = df_weather["date"].apply(convert_date_to_unix)

In [26]:
df_air_quality.date = df_air_quality.date.astype(str)
df_weather.date = df_weather.date.astype(str)

In [27]:
df_air_quality

Unnamed: 0,city_name,date,pm2_5,unix_time
0,Amsterdam,2013-01-01,14.0,1356994800000
1,Amsterdam,2013-01-02,8.0,1357081200000
2,Amsterdam,2013-01-03,12.0,1357167600000
3,Amsterdam,2013-01-04,12.0,1357254000000
4,Amsterdam,2013-01-05,14.0,1357340400000
...,...,...,...,...
156059,MARYSVILLE - 7TH AVE (Marysville Junior High),2023-03-30,7.9,1680127200000
156060,MARYSVILLE - 7TH AVE (Marysville Junior High),2023-03-31,3.7,1680213600000
156061,MARYSVILLE - 7TH AVE (Marysville Junior High),2023-04-01,3.4,1680300000000
156062,MARYSVILLE - 7TH AVE (Marysville Junior High),2023-04-02,3.1,1680386400000


---

### <span style="color:#ff5f27;"> üîÆ Connecting to Hopsworks Feature Store </span>

In [29]:
import hopsworks

project = hopsworks.login()

fs = project.get_feature_store() 

Connected. Call `.close()` to terminate connection gracefully.
Copy your Api Key (first register/login): https://c.app.hopsworks.ai/account/api/generated
Connected. Call `.close()` to terminate connection gracefully.

Multiple projects found. 

	 (1) annikaij
	 (2) miknie20

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/549019
Connected. Call `.close()` to terminate connection gracefully.


In [30]:
from great_expectations.core import ExpectationSuite, ExpectationConfiguration

expectation_suite = ExpectationSuite(expectation_suite_name="pmi_data")

expectation_suite.add_expectation(
    ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={
            "column": "pm2_5", 
            "min_value": 0.0,
            "max_value": 1000.0,
        }
    )
)

{"expectation_type": "expect_column_values_to_be_between", "kwargs": {"column": "pm2_5", "min_value": 0.0, "max_value": 1000.0}, "meta": {}}

## <span style="color:#ff5f27;">ü™Ñ Creating Feature Groups</span>

### <span style='color:#ff5f27'> üå´ Air Quality Data

In [31]:
air_quality_fg = fs.get_or_create_feature_group(
    name='air_quality',
    description='Air Quality characteristics of each day',
    version=1,
    primary_key=['city_name'], #'unix_time',
    online_enabled=False,
    expectation_suite = expectation_suite,
    event_time="unix_time"
)    

In [32]:
air_quality_fg.insert(df_air_quality, write_options={"wait_for_job": False})

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/549019/fs/544841/fg/758117
Validation failed.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/549019/fs/544841/fg/758117


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 156064/156064 | Elapsed Time: 00:16 | Remaining Time: 00:00


Launching job: air_quality_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/549019/jobs/named/air_quality_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7fb0dce8c6d0>,
 {
   "evaluation_parameters": {},
   "success": false,
   "statistics": {
     "evaluated_expectations": 1,
     "successful_expectations": 0,
     "unsuccessful_expectations": 1,
     "success_percent": 0.0
   },
   "results": [
     {
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       },
       "expectation_config": {
         "expectation_type": "expect_column_values_to_be_between",
         "kwargs": {
           "column": "pm2_5",
           "min_value": 0.0,
           "max_value": 1000.0
         },
         "meta": {
           "expectationId": 473089
         }
       },
       "success": false,
       "result": {
         "element_count": 156064,
         "missing_count": 0,
         "missing_percent": 0.0,
         "unexpected_count": 84,
         "unexpected_percent": 0.05382407217551774,
         "unexpected_percent_total": 0.0538240721755177

### <span style='color:#ff5f27'> üå¶ Weather Data

In [33]:
weather_fg = fs.get_or_create_feature_group(
    name='weather',
    description='Weather characteristics of each day',
    version=1,
    primary_key=['city_name'], #'unix_time'
    online_enabled=False,
    event_time="unix_time"
) 

In [34]:
weather_fg.insert(df_weather, write_options={"wait_for_job": False})

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/549019/fs/544841/fg/760147


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 169515/169515 | Elapsed Time: 00:22 | Remaining Time: 00:00


Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/549019/jobs/named/weather_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x7fb0dcedaf50>, None)