|
import os |
|
from datetime import date, datetime, timedelta |
|
|
|
import joblib |
|
import pandas as pd |
|
import torch |
|
from dotenv import load_dotenv |
|
from huggingface_hub import hf_hub_download, login |
|
from src.data_api_calls import ( |
|
get_combined_data, |
|
update_pollution_data, |
|
update_weather_data, |
|
) |
|
from src.features_pipeline import create_features |
|
|
|
load_dotenv() |
|
login(token=os.getenv("HUGGINGFACE_DOWNLOAD_TOKEN")) |
|
|
|
|
|
def load_nn() -> torch.nn.Module: |
|
""" |
|
Loads the neural network model for air pollution forecasting. |
|
|
|
Returns: |
|
torch.nn.Module: The loaded neural network model. |
|
""" |
|
import torch.nn as nn |
|
from huggingface_hub import PyTorchModelHubMixin |
|
|
|
class AirPollutionNet(nn.Module, PyTorchModelHubMixin): |
|
def __init__(self, input_size: int, layers: list[int], dropout_rate: float): |
|
super(AirPollutionNet, self).__init__() |
|
self.layers_list = nn.ModuleList() |
|
in_features = input_size |
|
|
|
for units in layers: |
|
self.layers_list.append(nn.Linear(in_features, units)) |
|
self.layers_list.append(nn.ReLU()) |
|
self.layers_list.append(nn.Dropout(p=dropout_rate)) |
|
in_features = units |
|
|
|
self.output = nn.Linear(in_features, 3) |
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
""" |
|
Forward pass of the neural network. |
|
|
|
Args: |
|
x (torch.Tensor): Input tensor. |
|
|
|
Returns: |
|
torch.Tensor: Output tensor after passing through the network. |
|
""" |
|
for layer in self.layers_list: |
|
x = layer(x) |
|
x = self.output(x) |
|
return x |
|
|
|
model = AirPollutionNet.from_pretrained( |
|
"akseljoonas/Utrecht_pollution_forecasting_NO2" |
|
) |
|
return model |
|
|
|
|
|
def load_model(particle: str) -> object: |
|
""" |
|
Loads the forecasting model based on the specified particle. |
|
|
|
Args: |
|
particle (str): The type of particle ("O3" or "NO2"). |
|
|
|
Returns: |
|
object: The loaded model (either a neural network or a support vector regression model). |
|
""" |
|
repo_id = f"elisaklunder/Utrecht-{particle}-Forecasting-Model" |
|
if particle == "O3": |
|
file_name = "O3_svr_model.pkl" |
|
model_path = hf_hub_download(repo_id=repo_id, filename=file_name) |
|
model = joblib.load(model_path) |
|
else: |
|
model = load_nn() |
|
|
|
return model |
|
|
|
|
|
def run_model(particle: str, data: pd.DataFrame) -> list: |
|
""" |
|
Runs the model for the specified particle and makes predictions based on the input data. |
|
|
|
Args: |
|
particle (str): The type of particle ("O3" or "NO2"). |
|
data (pd.DataFrame): The input data for making predictions. |
|
|
|
Returns: |
|
list: The predictions for the specified particle. |
|
""" |
|
input_data = create_features(data=data, target_particle=particle) |
|
model = load_model(particle) |
|
|
|
if particle == "NO2": |
|
with torch.no_grad(): |
|
prediction = model(torch.tensor(input_data.values, dtype=torch.float32)) |
|
repo_id = "akseljoonas/Utrecht_pollution_forecasting_NO2" |
|
file_name = "target_scaler_NO2.joblib" |
|
path = hf_hub_download(repo_id=repo_id, filename=file_name) |
|
else: |
|
prediction = model.predict(input_data) |
|
|
|
repo_id = f"elisaklunder/Utrecht-{particle}-Forecasting-Model" |
|
file_name = f"target_scaler_{particle}.joblib" |
|
path = hf_hub_download(repo_id=repo_id, filename=file_name) |
|
|
|
target_scaler = joblib.load(path) |
|
prediction = target_scaler.inverse_transform(prediction) |
|
|
|
return prediction |
|
|
|
|
|
def update_data_and_predictions() -> None: |
|
""" |
|
Updates the weather and pollution data, makes predictions for O3 and NO2, |
|
and stores them in a CSV file. |
|
""" |
|
update_weather_data() |
|
update_pollution_data() |
|
|
|
week_data = get_combined_data() |
|
|
|
o3_predictions = run_model("O3", data=week_data) |
|
no2_predictions = run_model("NO2", data=week_data) |
|
|
|
prediction_data = [] |
|
for i in range(3): |
|
prediction_data.append( |
|
{ |
|
"pollutant": "O3", |
|
"date_predicted": date.today(), |
|
"date": date.today() + timedelta(days=i + 1), |
|
"prediction_value": o3_predictions[0][i], |
|
} |
|
) |
|
prediction_data.append( |
|
{ |
|
"pollutant": "NO2", |
|
"date_predicted": date.today(), |
|
"date": date.today() + timedelta(days=i + 1), |
|
"prediction_value": no2_predictions[0][i], |
|
} |
|
) |
|
|
|
predictions_df = pd.DataFrame(prediction_data) |
|
|
|
PREDICTIONS_FILE = "predictions_history.csv" |
|
|
|
if os.path.exists(PREDICTIONS_FILE): |
|
existing_data = pd.read_csv(PREDICTIONS_FILE) |
|
|
|
existing_data = existing_data[ |
|
~(existing_data["date_predicted"] == str(date.today())) |
|
] |
|
combined_data = pd.concat([existing_data, predictions_df]) |
|
combined_data.drop_duplicates() |
|
else: |
|
combined_data = predictions_df |
|
|
|
combined_data.to_csv(PREDICTIONS_FILE, index=False) |
|
|
|
|
|
def get_data_and_predictions() -> tuple[pd.DataFrame, list, list]: |
|
""" |
|
Retrieves combined data and today's predictions for O3 and NO2. |
|
|
|
Returns: |
|
tuple: A tuple containing: |
|
- week_data (pd.DataFrame): The combined data for the week. |
|
- list: Predictions for O3. |
|
- list: Predictions for NO2. |
|
""" |
|
week_data = get_combined_data() |
|
|
|
PREDICTIONS_FILE = "predictions_history.csv" |
|
data = pd.read_csv(PREDICTIONS_FILE) |
|
|
|
today = datetime.today().strftime("%Y-%m-%d") |
|
today_predictions = data[(data["date_predicted"] == today)] |
|
|
|
|
|
o3_predictions = today_predictions[today_predictions["pollutant"] == "O3"][ |
|
"prediction_value" |
|
].values |
|
no2_predictions = today_predictions[today_predictions["pollutant"] == "NO2"][ |
|
"prediction_value" |
|
].values |
|
|
|
return week_data, [o3_predictions], [no2_predictions] |
|
|
|
|
|
if __name__ == "__main__": |
|
update_data_and_predictions() |
|
|