|
import os |
|
import tempfile |
|
import pandas as pd |
|
import warnings |
|
import torch |
|
import numpy as np |
|
from transformers import Trainer, TrainingArguments, set_seed |
|
|
|
from tsfm_public import TimeSeriesPreprocessor |
|
from tsfm_public.models.tinytimemixer import TinyTimeMixerForPrediction |
|
from tsfm_public.toolkit.get_model import get_model |
|
from tsfm_public.toolkit.dataset import ForecastDFDataset |
|
|
|
|
|
SEED = 42 |
|
TTM_MODEL_PATH = "iaravagni/ttm-finetuned-model" |
|
CONTEXT_LENGTH = 52 |
|
PREDICTION_LENGTH = 6 |
|
OUT_DIR = "ttm_finetuned_models/" |
|
|
|
def setup_environment(): |
|
""" |
|
Set up the environment for model training and evaluation. |
|
Creates necessary directories and sets random seed for reproducibility. |
|
""" |
|
set_seed(SEED) |
|
os.makedirs(OUT_DIR, exist_ok=True) |
|
os.makedirs(os.path.join(OUT_DIR, 'results'), exist_ok=True) |
|
|
|
def load_dataset(file_path): |
|
""" |
|
Load the dataset from the specified file path. |
|
|
|
Args: |
|
file_path (str): Path to the dataset CSV file |
|
|
|
Returns: |
|
pd.DataFrame: The loaded dataset |
|
""" |
|
return pd.read_csv(file_path) |
|
|
|
def prepare_data(data, timestamp_column): |
|
""" |
|
Prepare the dataset by converting timestamp column to datetime format. |
|
|
|
Args: |
|
data (pd.DataFrame): The dataset |
|
timestamp_column (str): Name of the timestamp column |
|
|
|
Returns: |
|
pd.DataFrame: The processed dataset |
|
""" |
|
data[timestamp_column] = pd.to_datetime(data[timestamp_column]) |
|
return data |
|
|
|
def get_column_specs(): |
|
""" |
|
Define and return column specifications for the dataset. |
|
|
|
Returns: |
|
dict: Column specifications including timestamp, ID, target, and control columns |
|
""" |
|
timestamp_column = "Timestamp" |
|
id_columns = ["patient_id"] |
|
target_columns = ["Glucose"] |
|
control_columns = ["Accelerometer", "Calories", "Carbs", "Sugar", "Gender", "HbA1c", "Age"] |
|
|
|
return { |
|
"timestamp_column": timestamp_column, |
|
"id_columns": id_columns, |
|
"target_columns": target_columns, |
|
"control_columns": control_columns, |
|
} |
|
|
|
def create_test_only_dataset(ts_preprocessor, test_dataset, train_dataset=None, stride=1, enable_padding=True, **dataset_kwargs): |
|
""" |
|
Creates a preprocessed pytorch dataset for testing only. |
|
|
|
Args: |
|
ts_preprocessor: TimeSeriesPreprocessor instance |
|
test_dataset: Pandas dataframe for testing |
|
train_dataset: Optional pandas dataframe for training the scaler |
|
stride: Stride used for creating the dataset |
|
enable_padding: If True, datasets are created with padding |
|
dataset_kwargs: Additional keyword arguments to pass to ForecastDFDataset |
|
|
|
Returns: |
|
ForecastDFDataset for testing |
|
""" |
|
|
|
test_data = ts_preprocessor._standardize_dataframe(test_dataset) |
|
|
|
|
|
if train_dataset is not None: |
|
train_data = ts_preprocessor._standardize_dataframe(train_dataset) |
|
ts_preprocessor.train(train_data) |
|
else: |
|
ts_preprocessor.train(test_data) |
|
|
|
|
|
test_data_prep = test_data.copy() |
|
|
|
|
|
column_specifiers = { |
|
"id_columns": ts_preprocessor.id_columns, |
|
"timestamp_column": ts_preprocessor.timestamp_column, |
|
"target_columns": ts_preprocessor.target_columns, |
|
"observable_columns": ts_preprocessor.observable_columns, |
|
"control_columns": ts_preprocessor.control_columns, |
|
"conditional_columns": ts_preprocessor.conditional_columns, |
|
"categorical_columns": ts_preprocessor.categorical_columns, |
|
"static_categorical_columns": ts_preprocessor.static_categorical_columns, |
|
} |
|
|
|
params = column_specifiers |
|
params["context_length"] = ts_preprocessor.context_length |
|
params["prediction_length"] = ts_preprocessor.prediction_length |
|
params["stride"] = stride |
|
params["enable_padding"] = enable_padding |
|
|
|
|
|
params["frequency_token"] = ts_preprocessor.get_frequency_token(ts_preprocessor.freq) |
|
|
|
|
|
params.update(**dataset_kwargs) |
|
|
|
|
|
test_dataset = ForecastDFDataset(test_data_prep, **params) |
|
|
|
if len(test_dataset) == 0: |
|
raise RuntimeError("The generated test dataset is of zero length.") |
|
|
|
return test_dataset |
|
|
|
|
|
|
|
def zeroshot_eval(train_df, test_df, batch_size, context_length=CONTEXT_LENGTH, forecast_length=PREDICTION_LENGTH, model_path=TTM_MODEL_PATH): |
|
""" |
|
Performs zero-shot evaluation of time series forecasting on test data. |
|
|
|
Args: |
|
train_df: Training dataframe |
|
test_df: Testing dataframe |
|
batch_size: Batch size for evaluation |
|
context_length: Number of time steps to use as context |
|
forecast_length: Number of time steps to predict |
|
|
|
Returns: |
|
dict: Dictionary containing predictions dataframe and metrics |
|
""" |
|
column_specifiers = get_column_specs() |
|
|
|
|
|
tsp = TimeSeriesPreprocessor( |
|
timestamp_column=column_specifiers["timestamp_column"], |
|
id_columns=column_specifiers["id_columns"], |
|
target_columns=column_specifiers["target_columns"], |
|
control_columns=column_specifiers["control_columns"], |
|
context_length=context_length, |
|
prediction_length=forecast_length, |
|
scaling=False, |
|
encode_categorical=False, |
|
force_return="zeropad", |
|
) |
|
|
|
|
|
zeroshot_model = get_model( |
|
model_path, |
|
context_length=context_length, |
|
prediction_length=forecast_length, |
|
freq_prefix_tuning=False, |
|
freq=None, |
|
prefer_l1_loss=False, |
|
prefer_longer_context=True, |
|
) |
|
|
|
|
|
dset_test = create_test_only_dataset(ts_preprocessor=tsp, test_dataset=test_df, train_dataset=train_df) |
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
zeroshot_trainer = Trainer( |
|
model=zeroshot_model, |
|
args=TrainingArguments( |
|
output_dir=temp_dir, |
|
per_device_eval_batch_size=batch_size, |
|
seed=SEED, |
|
report_to="none", |
|
), |
|
) |
|
|
|
|
|
predictions_dict = zeroshot_trainer.predict(dset_test) |
|
|
|
|
|
processed_predictions = process_predictions(predictions_dict, tsp, column_specifiers["target_columns"]) |
|
|
|
|
|
metrics = zeroshot_trainer.evaluate(dset_test) |
|
|
|
return { |
|
"predictions_df": processed_predictions, |
|
"metrics": metrics |
|
} |
|
|
|
def process_predictions(predictions_dict, tsp, target_columns): |
|
""" |
|
Process the predictions from the Trainer into a usable DataFrame. |
|
|
|
Args: |
|
predictions_dict: Predictions from the Trainer |
|
tsp: TimeSeriesPreprocessor instance |
|
target_columns: List of target column names |
|
|
|
Returns: |
|
pd.DataFrame: DataFrame containing processed predictions |
|
""" |
|
|
|
if hasattr(predictions_dict, 'predictions'): |
|
raw_predictions = predictions_dict.predictions |
|
else: |
|
raw_predictions = predictions_dict.get('predictions', predictions_dict) |
|
|
|
|
|
if isinstance(raw_predictions, tuple): |
|
predictions = raw_predictions[0] |
|
else: |
|
predictions = raw_predictions |
|
|
|
|
|
n_samples, n_timesteps, n_features = predictions.shape |
|
|
|
|
|
processed_df = pd.DataFrame() |
|
|
|
|
|
for i, col in enumerate(target_columns): |
|
if i < n_features: |
|
for t in range(n_timesteps): |
|
processed_df[f"{col}_step_{t+1}"] = predictions[:, t, i] |
|
|
|
return processed_df |
|
|
|
def simple_diagonal_averaging(predictions_df, test_data, context_length, step_columns): |
|
""" |
|
Improved approach to diagonally averaging predictions by patient. |
|
Properly handles the last rows of predictions to ensure all available |
|
predicted values are used. |
|
|
|
Args: |
|
predictions_df (pd.DataFrame): DataFrame with step-wise predictions |
|
test_data (pd.DataFrame): Original test data with patient IDs |
|
context_length (int): Number of context steps used in the model |
|
step_columns (list): List of step column names |
|
|
|
Returns: |
|
pd.DataFrame: DataFrame with averaged predictions |
|
""" |
|
|
|
final_df = test_data.copy() |
|
|
|
|
|
final_df['averaged_prediction'] = 0 |
|
|
|
|
|
for patient_id in test_data['patient_id'].unique(): |
|
|
|
patient_mask = final_df['patient_id'] == patient_id |
|
patient_indices = final_df[patient_mask].index |
|
|
|
|
|
start_idx = min(context_length, len(patient_indices)) |
|
|
|
|
|
for i in range(start_idx, len(patient_indices)): |
|
row_idx = patient_indices[i] |
|
pred_row_idx = i - context_length |
|
|
|
|
|
if pred_row_idx < 0: |
|
continue |
|
|
|
|
|
if pred_row_idx < len(predictions_df): |
|
|
|
avg_prediction = predictions_df.iloc[pred_row_idx][step_columns].mean() |
|
final_df.loc[row_idx, 'averaged_prediction'] = avg_prediction |
|
else: |
|
|
|
|
|
excess_steps = pred_row_idx - len(predictions_df) + 1 |
|
|
|
|
|
if excess_steps < len(step_columns): |
|
|
|
relevant_steps = step_columns[excess_steps:] |
|
if relevant_steps: |
|
avg_prediction = predictions_df.iloc[-1][relevant_steps].mean() |
|
final_df.loc[row_idx, 'averaged_prediction'] = avg_prediction |
|
|
|
return final_df |
|
|
|
|
|
def main(): |
|
""" |
|
Main function to execute the time series forecasting workflow. |
|
""" |
|
|
|
|
|
|
|
|
|
script_dir = os.path.dirname(os.path.abspath(__file__)) |
|
test_file = os.path.join(script_dir, '..', 'data', 'processed', 'test_dataset.csv') |
|
train_file = os.path.join(script_dir, '..', 'data', 'processed', 'train_dataset.csv') |
|
|
|
|
|
test_data = load_dataset(test_file) |
|
train_data = load_dataset(train_file) |
|
column_specs = get_column_specs() |
|
test_data = prepare_data(test_data, column_specs["timestamp_column"]) |
|
train_data = prepare_data(train_data, column_specs["timestamp_column"]) |
|
|
|
|
|
results = zeroshot_eval( |
|
train_df=train_data, |
|
test_df=test_data, |
|
batch_size=8, |
|
context_length=CONTEXT_LENGTH, |
|
forecast_length=PREDICTION_LENGTH |
|
) |
|
|
|
|
|
step_columns = [col for col in results["predictions_df"].columns if col.startswith("Glucose_step_")] |
|
|
|
|
|
final_results = simple_diagonal_averaging( |
|
results["predictions_df"], |
|
test_data, |
|
CONTEXT_LENGTH, |
|
step_columns |
|
) |
|
|
|
|
|
raw_predictions_path = os.path.join(script_dir, '..', 'data', 'outputs', 'dl_predictions_raw.csv') |
|
results["predictions_df"].to_csv(raw_predictions_path, index=False) |
|
|
|
|
|
final_results_path = os.path.join(script_dir, '..', 'data', 'outputs', 'dl_predictions.csv') |
|
final_results.to_csv(final_results_path, index=False) |
|
|
|
return |
|
|
|
if __name__ == "__main__": |
|
main() |