Aksel Joonas Reedi
typehints
8db7b4c
import os
from datetime import date, datetime, timedelta
import joblib
import pandas as pd
import torch
from dotenv import load_dotenv
from huggingface_hub import hf_hub_download, login
from src.data_api_calls import (
get_combined_data,
update_pollution_data,
update_weather_data,
)
from src.features_pipeline import create_features
load_dotenv()
login(token=os.getenv("HUGGINGFACE_DOWNLOAD_TOKEN"))
def load_nn() -> torch.nn.Module:
"""
Loads the neural network model for air pollution forecasting.
Returns:
torch.nn.Module: The loaded neural network model.
"""
import torch.nn as nn
from huggingface_hub import PyTorchModelHubMixin
class AirPollutionNet(nn.Module, PyTorchModelHubMixin):
def __init__(self, input_size: int, layers: list[int], dropout_rate: float):
super(AirPollutionNet, self).__init__()
self.layers_list = nn.ModuleList()
in_features = input_size
for units in layers:
self.layers_list.append(nn.Linear(in_features, units))
self.layers_list.append(nn.ReLU())
self.layers_list.append(nn.Dropout(p=dropout_rate))
in_features = units
self.output = nn.Linear(in_features, 3) # Output size is 3 for next 3 days
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass of the neural network.
Args:
x (torch.Tensor): Input tensor.
Returns:
torch.Tensor: Output tensor after passing through the network.
"""
for layer in self.layers_list:
x = layer(x)
x = self.output(x)
return x
model = AirPollutionNet.from_pretrained(
"akseljoonas/Utrecht_pollution_forecasting_NO2"
)
return model
def load_model(particle: str) -> object:
"""
Loads the forecasting model based on the specified particle.
Args:
particle (str): The type of particle ("O3" or "NO2").
Returns:
object: The loaded model (either a neural network or a support vector regression model).
"""
repo_id = f"elisaklunder/Utrecht-{particle}-Forecasting-Model"
if particle == "O3":
file_name = "O3_svr_model.pkl"
model_path = hf_hub_download(repo_id=repo_id, filename=file_name)
model = joblib.load(model_path)
else:
model = load_nn()
return model
def run_model(particle: str, data: pd.DataFrame) -> list:
"""
Runs the model for the specified particle and makes predictions based on the input data.
Args:
particle (str): The type of particle ("O3" or "NO2").
data (pd.DataFrame): The input data for making predictions.
Returns:
list: The predictions for the specified particle.
"""
input_data = create_features(data=data, target_particle=particle)
model = load_model(particle)
if particle == "NO2":
with torch.no_grad():
prediction = model(torch.tensor(input_data.values, dtype=torch.float32))
repo_id = "akseljoonas/Utrecht_pollution_forecasting_NO2"
file_name = "target_scaler_NO2.joblib"
path = hf_hub_download(repo_id=repo_id, filename=file_name)
else:
prediction = model.predict(input_data)
repo_id = f"elisaklunder/Utrecht-{particle}-Forecasting-Model"
file_name = f"target_scaler_{particle}.joblib"
path = hf_hub_download(repo_id=repo_id, filename=file_name)
target_scaler = joblib.load(path)
prediction = target_scaler.inverse_transform(prediction)
return prediction
def update_data_and_predictions() -> None:
"""
Updates the weather and pollution data, makes predictions for O3 and NO2,
and stores them in a CSV file.
"""
update_weather_data()
update_pollution_data()
week_data = get_combined_data()
o3_predictions = run_model("O3", data=week_data)
no2_predictions = run_model("NO2", data=week_data)
prediction_data = []
for i in range(3):
prediction_data.append(
{
"pollutant": "O3",
"date_predicted": date.today(),
"date": date.today() + timedelta(days=i + 1),
"prediction_value": o3_predictions[0][i],
}
)
prediction_data.append(
{
"pollutant": "NO2",
"date_predicted": date.today(),
"date": date.today() + timedelta(days=i + 1),
"prediction_value": no2_predictions[0][i],
}
)
predictions_df = pd.DataFrame(prediction_data)
PREDICTIONS_FILE = "predictions_history.csv"
if os.path.exists(PREDICTIONS_FILE):
existing_data = pd.read_csv(PREDICTIONS_FILE)
# Filter out predictions made today to avoid duplicates
existing_data = existing_data[
~(existing_data["date_predicted"] == str(date.today()))
]
combined_data = pd.concat([existing_data, predictions_df])
combined_data.drop_duplicates()
else:
combined_data = predictions_df
combined_data.to_csv(PREDICTIONS_FILE, index=False)
def get_data_and_predictions() -> tuple[pd.DataFrame, list, list]:
"""
Retrieves combined data and today's predictions for O3 and NO2.
Returns:
tuple: A tuple containing:
- week_data (pd.DataFrame): The combined data for the week.
- list: Predictions for O3.
- list: Predictions for NO2.
"""
week_data = get_combined_data()
PREDICTIONS_FILE = "predictions_history.csv"
data = pd.read_csv(PREDICTIONS_FILE)
today = datetime.today().strftime("%Y-%m-%d")
today_predictions = data[(data["date_predicted"] == today)]
# Extract predictions for O3 and NO2
o3_predictions = today_predictions[today_predictions["pollutant"] == "O3"][
"prediction_value"
].values
no2_predictions = today_predictions[today_predictions["pollutant"] == "NO2"][
"prediction_value"
].values
return week_data, [o3_predictions], [no2_predictions]
if __name__ == "__main__":
update_data_and_predictions()