|
import requests |
|
import os |
|
import joblib |
|
import pandas as pd |
|
import datetime |
|
import numpy as np |
|
import time |
|
from sklearn.preprocessing import OrdinalEncoder |
|
from dotenv import load_dotenv |
|
load_dotenv(override=True) |
|
|
|
|
|
def decode_features(df, feature_view): |
|
"""Decodes features in the input DataFrame using corresponding Hopsworks Feature Store transformation functions""" |
|
df_res = df.copy() |
|
|
|
import inspect |
|
|
|
|
|
td_transformation_functions = feature_view._batch_scoring_server._transformation_functions |
|
|
|
res = {} |
|
for feature_name in td_transformation_functions: |
|
if feature_name in df_res.columns: |
|
td_transformation_function = td_transformation_functions[feature_name] |
|
sig, foobar_locals = inspect.signature(td_transformation_function.transformation_fn), locals() |
|
param_dict = dict([(param.name, param.default) for param in sig.parameters.values() if param.default != inspect._empty]) |
|
if td_transformation_function.name == "min_max_scaler": |
|
df_res[feature_name] = df_res[feature_name].map( |
|
lambda x: x * (param_dict["max_value"] - param_dict["min_value"]) + param_dict["min_value"]) |
|
|
|
elif td_transformation_function.name == "standard_scaler": |
|
df_res[feature_name] = df_res[feature_name].map( |
|
lambda x: x * param_dict['std_dev'] + param_dict["mean"]) |
|
elif td_transformation_function.name == "label_encoder": |
|
dictionary = param_dict['value_to_index'] |
|
dictionary_ = {v: k for k, v in dictionary.items()} |
|
df_res[feature_name] = df_res[feature_name].map( |
|
lambda x: dictionary_[x]) |
|
return df_res |
|
|
|
|
|
def get_model(project, model_name, evaluation_metric, sort_metrics_by): |
|
"""Retrieve desired model or download it from the Hopsworks Model Registry. |
|
In second case, it will be physically downloaded to this directory""" |
|
TARGET_FILE = "model.pkl" |
|
list_of_files = [os.path.join(dirpath,filename) for dirpath, _, filenames \ |
|
in os.walk('.') for filename in filenames if filename == TARGET_FILE] |
|
|
|
if list_of_files: |
|
model_path = list_of_files[0] |
|
model = joblib.load(model_path) |
|
else: |
|
if not os.path.exists(TARGET_FILE): |
|
mr = project.get_model_registry() |
|
|
|
model = mr.get_best_model(model_name, |
|
evaluation_metric, |
|
sort_metrics_by) |
|
model_dir = model.download() |
|
model = joblib.load(model_dir + "/model.pkl") |
|
|
|
return model |
|
|
|
|
|
def get_air_quality_data(station_name): |
|
AIR_QUALITY_API_KEY = os.getenv('AIR_QUALITY_API_KEY') |
|
request_value = f'https://api.waqi.info/feed/{station_name}/?token={AIR_QUALITY_API_KEY}' |
|
answer = requests.get(request_value).json()["data"] |
|
forecast = answer['forecast']['daily'] |
|
return [ |
|
answer["time"]["s"][:10], |
|
int(forecast['pm25'][0]['avg']), |
|
int(forecast['pm10'][0]['avg']), |
|
max(int(forecast['pm25'][0]['avg']), int(forecast['pm10'][0]['avg'])) |
|
] |
|
|
|
def get_air_quality_df(data): |
|
col_names = [ |
|
'date', |
|
'pm25', |
|
'pm10', |
|
'aqi' |
|
] |
|
|
|
new_data = pd.DataFrame( |
|
data |
|
).T |
|
new_data.columns = col_names |
|
new_data['pm25'] = pd.to_numeric(new_data['pm25']) |
|
new_data['pm10'] = pd.to_numeric(new_data['pm10']) |
|
new_data['aqi'] = pd.to_numeric(new_data['aqi']) |
|
|
|
print(new_data) |
|
return new_data |
|
|
|
|
|
def get_weather_data_daily(city): |
|
def get_weather_data_daily(city): |
|
WEATHER_API_KEY = os.getenv('WEATHER_API_KEY') |
|
answer = requests.get(f'https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/{city}/today?unitGroup=metric&include=days&key={WEATHER_API_KEY}&contentType=json').json() |
|
data = answer['days'][0] |
|
return [ |
|
answer['city'], |
|
data['date'], |
|
data['tempmax'], |
|
data['tempmin'], |
|
data['temp'], |
|
data['feelslikemax'], |
|
data['feelslikemin'], |
|
data['feelslike'], |
|
data['dew'], |
|
data['humidity'], |
|
data['precip'], |
|
data['precipprob'], |
|
data['precipcover'], |
|
data['snow'], |
|
data['snowdepth'], |
|
data['windgust'], |
|
data['windspeed'], |
|
data['winddir'], |
|
data['pressure'], |
|
data['cloudcover'], |
|
data['visibility'], |
|
data['solarradiation'], |
|
data['solarenergy'], |
|
data['uvindex'], |
|
data['conditions'] |
|
] |
|
|
|
def get_weather_data_weekly(city: str, start_date: datetime) -> pd.DataFrame: |
|
|
|
|
|
next7days_weather=pd.read_csv('https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/Beijing/next7days?unitGroup=metric&include=days&key=5WNL2M94KKQ4R4F32LFV8DPE4&contentType=csv') |
|
|
|
|
|
|
|
df_weather = pd.DataFrame(next7days_weather) |
|
df_weather.rename(columns = {"datetime": "date"}, |
|
inplace = True) |
|
df_weather.rename(columns = {"name": "city"}, |
|
inplace = True) |
|
df_weather.rename(columns = {"sealevelpressure": "pressure"}, |
|
inplace = True) |
|
df_weather = df_weather.drop(labels=['city','dew','precip','tempmax','pressure','tempmin','temp','feelslikemax','feelslikemin','feelslike','precipprob','precipcover','snow','snowdepth','cloudcover','severerisk','moonphase','preciptype','sunrise','sunset','conditions','description','icon','stations'], axis=1) |
|
|
|
return df_weather |
|
|
|
def get_weather_df(data): |
|
col_names = [ |
|
'name', |
|
'date', |
|
'tempmax', |
|
'tempmin', |
|
'temp', |
|
'feelslikemax', |
|
'feelslikemin', |
|
'feelslike', |
|
'dew', |
|
'humidity', |
|
'precip', |
|
'precipprob', |
|
'precipcover', |
|
'snow', |
|
'snowdepth', |
|
'windgust', |
|
'windspeed', |
|
'winddir', |
|
'pressure', |
|
'cloudcover', |
|
'visibility', |
|
'solarradiation', |
|
'solarenergy', |
|
'uvindex', |
|
'conditions' |
|
] |
|
|
|
new_data = pd.DataFrame( |
|
data |
|
).T |
|
new_data.columns = col_names |
|
for col in col_names: |
|
if col not in ['name', 'date', 'conditions']: |
|
new_data[col] = pd.to_numeric(new_data[col]) |
|
|
|
return new_data |
|
|
|
def data_encoder(X): |
|
X.drop(columns=['date', 'name'], inplace=True) |
|
X['conditions'] = OrdinalEncoder().fit_transform(X[['conditions']]) |
|
return X |
|
|
|
def transform(df): |
|
df.loc[df["windgust"].isna(),'windgust'] = df['windspeed'] |
|
df['snow'].fillna(0,inplace=True) |
|
df['snowdepth'].fillna(0, inplace=True) |
|
df['pressure'].fillna(df['pressure'].mean(), inplace=True) |
|
return df |
|
|
|
|
|
def get_aplevel(temps:np.ndarray) -> list: |
|
boundary_list = np.array([0, 50, 100, 150, 200, 300]) |
|
redf = np.logical_not(temps<=boundary_list) |
|
hift = np.concatenate((np.roll(redf, -1)[:, :-1], np.full((temps.shape[0], 1), False)), axis = 1) |
|
cat = np.nonzero(np.not_equal(redf,hift)) |
|
|
|
air_pollution_level = ['Good', 'Moderate', 'Unhealthy for sensitive Groups','Unhealthy' ,'Very Unhealthy', 'Hazardous'] |
|
level = [air_pollution_level[el] for el in cat[1]] |
|
return level |
|
|
|
def timestamp_2_time(x): |
|
dt_obj = datetime.datetime.strptime(str(x), '%Y-%m-%d') |
|
dt_obj = dt_obj.timestamp() * 1000 |
|
return int(dt_obj) |