|
import lightning as L
|
|
import omegaconf
|
|
import torch
|
|
from lightning.pytorch.loggers import WandbLogger
|
|
import wandb
|
|
from torch.utils.data import DataLoader
|
|
from lightning.pytorch.callbacks import TQDMProgressBar
|
|
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
|
|
from config.config import Config
|
|
from models.engine import Engine
|
|
from preprocessing.fi_2010 import fi_2010_load
|
|
from preprocessing.lobster import lobster_load
|
|
from preprocessing.dataset import Dataset, DataModule
|
|
import constants as cst
|
|
|
|
|
|
def run(config: Config, accelerator, model=None):
|
|
run_name = ""
|
|
for param in config.model.keys():
|
|
value = config.model[param]
|
|
if param == "hyperparameters_sweep":
|
|
continue
|
|
if type(value) == omegaconf.dictconfig.DictConfig:
|
|
for key in value.keys():
|
|
run_name += str(key[:2]) + "_" + str(value[key]) + "_"
|
|
else:
|
|
run_name += str(param[:2]) + "_" + str(value.value) + "_"
|
|
run_name += f"seed_{config.experiment.seed}"
|
|
seq_size = config.model.hyperparameters_fixed["seq_size"]
|
|
horizon = config.experiment.horizon
|
|
training_stocks = config.experiment.training_stocks
|
|
dataset = config.experiment.dataset_type.value
|
|
if dataset == "LOBSTER":
|
|
config.experiment.filename_ckpt = f"{dataset}_{training_stocks}_seq_size_{seq_size}_horizon_{horizon}_{run_name}"
|
|
else:
|
|
config.experiment.filename_ckpt = f"{dataset}_seq_size_{seq_size}_horizon_{horizon}_{run_name}"
|
|
run_name = config.experiment.filename_ckpt
|
|
|
|
trainer = L.Trainer(
|
|
accelerator=accelerator,
|
|
precision=cst.PRECISION,
|
|
max_epochs=config.experiment.max_epochs,
|
|
callbacks=[
|
|
EarlyStopping(monitor="val_loss", mode="min", patience=2, verbose=True, min_delta=0.002),
|
|
TQDMProgressBar(refresh_rate=100)
|
|
],
|
|
num_sanity_val_steps=0,
|
|
detect_anomaly=False,
|
|
profiler=None,
|
|
check_val_every_n_epoch=1
|
|
)
|
|
train(config, trainer)
|
|
|
|
|
|
def train(config: Config, trainer: L.Trainer, run=None):
|
|
print_setup(config)
|
|
dataset_type = config.experiment.dataset_type.value
|
|
seq_size = config.model.hyperparameters_fixed["seq_size"]
|
|
horizon = config.experiment.horizon
|
|
model_type = config.model.type
|
|
training_stocks = config.experiment.training_stocks
|
|
testing_stocks = config.experiment.testing_stocks
|
|
dataset_type = config.experiment.dataset_type.value
|
|
if dataset_type == "FI-2010":
|
|
path = cst.DATA_DIR + "/FI_2010"
|
|
train_input, train_labels, val_input, val_labels, test_input, test_labels = fi_2010_load(path, seq_size, horizon, config.model.hyperparameters_fixed["all_features"])
|
|
data_module = DataModule(
|
|
train_set=Dataset(train_input, train_labels, seq_size),
|
|
val_set=Dataset(val_input, val_labels, seq_size),
|
|
test_set=Dataset(test_input, test_labels, seq_size),
|
|
batch_size=config.experiment.batch_size,
|
|
test_batch_size=config.experiment.batch_size*4,
|
|
num_workers=4
|
|
)
|
|
test_loaders = [data_module.test_dataloader()]
|
|
else:
|
|
for i in range(len(training_stocks)):
|
|
if i == 0:
|
|
for j in range(2):
|
|
if j == 0:
|
|
path = cst.DATA_DIR + "/" + training_stocks[i] + "/train.npy"
|
|
train_input, train_labels = lobster_load(path, config.model.hyperparameters_fixed["all_features"], cst.LEN_SMOOTH, horizon, seq_size)
|
|
if j == 1:
|
|
path = cst.DATA_DIR + "/" + training_stocks[i] + "/val.npy"
|
|
val_input, val_labels = lobster_load(path, config.model.hyperparameters_fixed["all_features"], cst.LEN_SMOOTH, horizon, seq_size)
|
|
else:
|
|
for j in range(2):
|
|
if j == 0:
|
|
path = cst.DATA_DIR + "/" + training_stocks[i] + "/train.npy"
|
|
train_labels = torch.cat((train_labels, torch.zeros(seq_size+horizon-1, dtype=torch.long)), 0)
|
|
train_input_tmp, train_labels_tmp = lobster_load(path, config.model.hyperparameters_fixed["all_features"], cst.LEN_SMOOTH, horizon, seq_size)
|
|
train_input = torch.cat((train_input, train_input_tmp), 0)
|
|
train_labels = torch.cat((train_labels, train_labels_tmp), 0)
|
|
if j == 1:
|
|
path = cst.DATA_DIR + "/" + training_stocks[i] + "/val.npy"
|
|
val_labels = torch.cat((val_labels, torch.zeros(seq_size+horizon-1, dtype=torch.long)), 0)
|
|
val_input_tmp, val_labels_tmp = lobster_load(path, config.model.hyperparameters_fixed["all_features"], cst.LEN_SMOOTH, horizon, seq_size)
|
|
val_input = torch.cat((val_input, val_input_tmp), 0)
|
|
val_labels = torch.cat((val_labels, val_labels_tmp), 0)
|
|
test_loaders = []
|
|
for i in range(len(testing_stocks)):
|
|
path = cst.DATA_DIR + "/" + testing_stocks[i] + "/test.npy"
|
|
test_input, test_labels = lobster_load(path, config.model.hyperparameters_fixed["all_features"], cst.LEN_SMOOTH, horizon, seq_size)
|
|
test_set = Dataset(test_input, test_labels, seq_size)
|
|
test_dataloader = DataLoader(
|
|
dataset=test_set,
|
|
batch_size=config.experiment.batch_size*4,
|
|
shuffle=False,
|
|
pin_memory=True,
|
|
drop_last=False,
|
|
num_workers=4,
|
|
persistent_workers=True
|
|
)
|
|
test_loaders.append(test_dataloader)
|
|
train_set = Dataset(train_input, train_labels, seq_size)
|
|
val_set = Dataset(val_input, val_labels, seq_size)
|
|
counts_train = torch.unique(train_labels, return_counts=True)
|
|
counts_val = torch.unique(val_labels, return_counts=True)
|
|
print("Train set shape: ", train_input.shape)
|
|
print("Val set shape: ", val_input.shape)
|
|
print("Classes counts in train set: ", counts_train[1])
|
|
print("Classes counts in val set: ", counts_val[1])
|
|
print(f"Classes distribution in train set: up {counts_train[1][0]/train_labels.shape[0]} stat {counts_train[1][1]/train_labels.shape[0]} down {counts_train[1][2]/train_labels.shape[0]} ", )
|
|
print(f"Classes distribution in val set: up {counts_val[1][0]/val_labels.shape[0]} stat {counts_val[1][1]/val_labels.shape[0]} down {counts_val[1][2]/val_labels.shape[0]} ", )
|
|
data_module = DataModule(
|
|
train_set=train_set,
|
|
val_set=val_set,
|
|
batch_size=config.experiment.batch_size,
|
|
test_batch_size=config.experiment.batch_size*4,
|
|
num_workers=4
|
|
)
|
|
|
|
experiment_type = config.experiment.type
|
|
if "FINETUNING" in experiment_type or "EVALUATION" in experiment_type:
|
|
checkpoint = torch.load(config.experiment.checkpoint_reference, map_location=cst.DEVICE)
|
|
print("Loading model from checkpoint: ", config.experiment.checkpoint_reference)
|
|
lr = checkpoint["hyper_parameters"]["lr"]
|
|
filename_ckpt = checkpoint["hyper_parameters"]["filename_ckpt"]
|
|
hidden_dim = checkpoint["hyper_parameters"]["hidden_dim"]
|
|
num_layers = checkpoint["hyper_parameters"]["num_layers"]
|
|
optimizer = checkpoint["hyper_parameters"]["optimizer"]
|
|
model_type = checkpoint["hyper_parameters"]["model_type"]
|
|
max_epochs = checkpoint["hyper_parameters"]["max_epochs"]
|
|
horizon = checkpoint["hyper_parameters"]["horizon"]
|
|
seq_size = checkpoint["hyper_parameters"]["seq_size"]
|
|
if model_type == "MLPLOB":
|
|
model = Engine.load_from_checkpoint(
|
|
config.experiment.checkpoint_reference,
|
|
seq_size=seq_size,
|
|
horizon=horizon,
|
|
max_epochs=max_epochs,
|
|
model_type=model_type,
|
|
is_wandb=config.experiment.is_wandb,
|
|
experiment_type=experiment_type,
|
|
lr=lr,
|
|
optimizer=optimizer,
|
|
filename_ckpt=filename_ckpt,
|
|
hidden_dim=hidden_dim,
|
|
num_layers=num_layers,
|
|
num_features=train_input.shape[1],
|
|
dataset_type=dataset_type,
|
|
map_location=cst.DEVICE,
|
|
)
|
|
elif model_type == "TLOB":
|
|
model = Engine.load_from_checkpoint(
|
|
config.experiment.checkpoint_reference,
|
|
seq_size=seq_size,
|
|
horizon=horizon,
|
|
max_epochs=max_epochs,
|
|
model_type=model_type,
|
|
is_wandb=config.experiment.is_wandb,
|
|
experiment_type=experiment_type,
|
|
lr=lr,
|
|
optimizer=optimizer,
|
|
filename_ckpt=filename_ckpt,
|
|
hidden_dim=hidden_dim,
|
|
num_layers=num_layers,
|
|
num_features=train_input.shape[1],
|
|
dataset_type=dataset_type,
|
|
num_heads=checkpoint["hyper_parameters"]["num_heads"],
|
|
is_sin_emb=checkpoint["hyper_parameters"]["is_sin_emb"],
|
|
map_location=cst.DEVICE,
|
|
len_test_dataloader=len(test_loaders[0])
|
|
)
|
|
elif model_type == "BINCTABL":
|
|
model = Engine.load_from_checkpoint(
|
|
config.experiment.checkpoint_reference,
|
|
seq_size=seq_size,
|
|
horizon=horizon,
|
|
max_epochs=max_epochs,
|
|
model_type=model_type,
|
|
is_wandb=config.experiment.is_wandb,
|
|
experiment_type=experiment_type,
|
|
lr=lr,
|
|
optimizer=optimizer,
|
|
filename_ckpt=filename_ckpt,
|
|
num_features=train_input.shape[1],
|
|
dataset_type=dataset_type,
|
|
map_location=cst.DEVICE,
|
|
len_test_dataloader=len(test_loaders[0])
|
|
)
|
|
elif model_type == "DEEPLOB":
|
|
model = Engine.load_from_checkpoint(
|
|
config.experiment.checkpoint_reference,
|
|
seq_size=seq_size,
|
|
horizon=horizon,
|
|
max_epochs=max_epochs,
|
|
model_type=model_type,
|
|
is_wandb=config.experiment.is_wandb,
|
|
experiment_type=experiment_type,
|
|
lr=lr,
|
|
optimizer=optimizer,
|
|
filename_ckpt=filename_ckpt,
|
|
num_features=train_input.shape[1],
|
|
dataset_type=dataset_type,
|
|
map_location=cst.DEVICE,
|
|
len_test_dataloader=len(test_loaders[0])
|
|
)
|
|
|
|
else:
|
|
if model_type == cst.ModelType.MLPLOB:
|
|
model = Engine(
|
|
seq_size=seq_size,
|
|
horizon=horizon,
|
|
max_epochs=config.experiment.max_epochs,
|
|
model_type=config.model.type.value,
|
|
is_wandb=config.experiment.is_wandb,
|
|
experiment_type=experiment_type,
|
|
lr=config.model.hyperparameters_fixed["lr"],
|
|
optimizer=config.experiment.optimizer,
|
|
filename_ckpt=config.experiment.filename_ckpt,
|
|
hidden_dim=config.model.hyperparameters_fixed["hidden_dim"],
|
|
num_layers=config.model.hyperparameters_fixed["num_layers"],
|
|
num_features=train_input.shape[1],
|
|
dataset_type=dataset_type,
|
|
len_test_dataloader=len(test_loaders[0])
|
|
)
|
|
elif model_type == cst.ModelType.TLOB:
|
|
model = Engine(
|
|
seq_size=seq_size,
|
|
horizon=horizon,
|
|
max_epochs=config.experiment.max_epochs,
|
|
model_type=config.model.type.value,
|
|
is_wandb=config.experiment.is_wandb,
|
|
experiment_type=experiment_type,
|
|
lr=config.model.hyperparameters_fixed["lr"],
|
|
optimizer=config.experiment.optimizer,
|
|
filename_ckpt=config.experiment.filename_ckpt,
|
|
hidden_dim=config.model.hyperparameters_fixed["hidden_dim"],
|
|
num_layers=config.model.hyperparameters_fixed["num_layers"],
|
|
num_features=train_input.shape[1],
|
|
dataset_type=dataset_type,
|
|
num_heads=config.model.hyperparameters_fixed["num_heads"],
|
|
is_sin_emb=config.model.hyperparameters_fixed["is_sin_emb"],
|
|
len_test_dataloader=len(test_loaders[0])
|
|
)
|
|
elif model_type == cst.ModelType.BINCTABL:
|
|
model = Engine(
|
|
seq_size=seq_size,
|
|
horizon=horizon,
|
|
max_epochs=config.experiment.max_epochs,
|
|
model_type=config.model.type.value,
|
|
is_wandb=config.experiment.is_wandb,
|
|
experiment_type=experiment_type,
|
|
lr=config.model.hyperparameters_fixed["lr"],
|
|
optimizer=config.experiment.optimizer,
|
|
filename_ckpt=config.experiment.filename_ckpt,
|
|
num_features=train_input.shape[1],
|
|
dataset_type=dataset_type,
|
|
len_test_dataloader=len(test_loaders[0])
|
|
)
|
|
elif model_type == cst.ModelType.DEEPLOB:
|
|
model = Engine(
|
|
seq_size=seq_size,
|
|
horizon=horizon,
|
|
max_epochs=config.experiment.max_epochs,
|
|
model_type=config.model.type.value,
|
|
is_wandb=config.experiment.is_wandb,
|
|
experiment_type=experiment_type,
|
|
lr=config.model.hyperparameters_fixed["lr"],
|
|
optimizer=config.experiment.optimizer,
|
|
filename_ckpt=config.experiment.filename_ckpt,
|
|
num_features=train_input.shape[1],
|
|
dataset_type=dataset_type,
|
|
len_test_dataloader=len(test_loaders[0])
|
|
)
|
|
|
|
print("total number of parameters: ", sum(p.numel() for p in model.parameters()))
|
|
train_dataloader, val_dataloader = data_module.train_dataloader(), data_module.val_dataloader()
|
|
|
|
if "TRAINING" in experiment_type or "FINETUNING" in experiment_type:
|
|
trainer.fit(model, train_dataloader, val_dataloader)
|
|
best_model_path = model.last_path_ckpt
|
|
print("Best model path: ", best_model_path)
|
|
try:
|
|
best_model = Engine.load_from_checkpoint(best_model_path, map_location=cst.DEVICE)
|
|
except:
|
|
print("no checkpoints has been saved, selecting the last model")
|
|
best_model = model
|
|
best_model.experiment_type = "EVALUATION"
|
|
for i in range(len(test_loaders)):
|
|
test_dataloader = test_loaders[i]
|
|
output = trainer.test(best_model, test_dataloader)
|
|
if run is not None and dataset_type == "LOBSTER":
|
|
run.log({f"f1 {testing_stocks[i]} best": output[0]["f1_score"]}, commit=False)
|
|
elif run is not None and dataset_type == "FI-2010":
|
|
run.log({f"f1 FI-2010 ": output[0]["f1_score"]}, commit=False)
|
|
else:
|
|
for i in range(len(test_loaders)):
|
|
test_dataloader = test_loaders[i]
|
|
output = trainer.test(model, test_dataloader)
|
|
if run is not None and dataset_type == "LOBSTER":
|
|
run.log({f"f1 {testing_stocks[i]} best": output[0]["f1_score"]}, commit=False)
|
|
elif run is not None and dataset_type == "FI-2010":
|
|
run.log({f"f1 FI-2010 ": output[0]["f1_score"]}, commit=False)
|
|
|
|
|
|
|
|
def run_wandb(config: Config, accelerator):
|
|
def wandb_sweep_callback():
|
|
wandb_logger = WandbLogger(project=cst.PROJECT_NAME, log_model=False, save_dir=cst.DIR_SAVED_MODEL)
|
|
run_name = None
|
|
if not config.experiment.is_sweep:
|
|
run_name = ""
|
|
for param in config.model.keys():
|
|
value = config.model[param]
|
|
if param == "hyperparameters_sweep":
|
|
continue
|
|
if type(value) == omegaconf.dictconfig.DictConfig:
|
|
for key in value.keys():
|
|
run_name += str(key[:2]) + "_" + str(value[key]) + "_"
|
|
else:
|
|
run_name += str(param[:2]) + "_" + str(value.value) + "_"
|
|
|
|
run = wandb.init(project=cst.PROJECT_NAME, name=run_name, entity="")
|
|
|
|
if config.experiment.is_sweep:
|
|
model_params = run.config
|
|
else:
|
|
model_params = config.model.hyperparameters_fixed
|
|
wandb_instance_name = ""
|
|
for param in config.model.hyperparameters_fixed.keys():
|
|
if param in model_params:
|
|
config.model.hyperparameters_fixed[param] = model_params[param]
|
|
wandb_instance_name += str(param) + "_" + str(model_params[param]) + "_"
|
|
|
|
|
|
|
|
run.name = wandb_instance_name
|
|
seq_size = config.model.hyperparameters_fixed["seq_size"]
|
|
horizon = config.experiment.horizon
|
|
dataset = config.experiment.dataset_type.value
|
|
training_stocks = config.experiment.training_stocks
|
|
if dataset == "LOBSTER":
|
|
config.experiment.filename_ckpt = f"{dataset}_{training_stocks}_seq_size_{seq_size}_horizon_{horizon}_{run_name}"
|
|
else:
|
|
config.experiment.filename_ckpt = f"{dataset}_seq_size_{seq_size}_horizon_{horizon}_{run_name}"
|
|
wandb_instance_name = config.experiment.filename_ckpt
|
|
|
|
trainer = L.Trainer(
|
|
accelerator=accelerator,
|
|
precision=cst.PRECISION,
|
|
max_epochs=config.experiment.max_epochs,
|
|
callbacks=[
|
|
EarlyStopping(monitor="val_loss", mode="min", patience=2, verbose=True, min_delta=0.002),
|
|
TQDMProgressBar(refresh_rate=1000)
|
|
],
|
|
num_sanity_val_steps=0,
|
|
logger=wandb_logger,
|
|
detect_anomaly=False,
|
|
check_val_every_n_epoch=1,
|
|
)
|
|
|
|
|
|
run.log({"model": config.model.type.value}, commit=False)
|
|
run.log({"dataset": config.experiment.dataset_type.value}, commit=False)
|
|
run.log({"seed": config.experiment.seed}, commit=False)
|
|
run.log({"all_features": config.model.hyperparameters_fixed["all_features"]}, commit=False)
|
|
if config.experiment.dataset_type == cst.Dataset.LOBSTER:
|
|
for i in range(len(config.experiment.training_stocks)):
|
|
run.log({f"training stock{i}": config.experiment.training_stocks[i]}, commit=False)
|
|
for i in range(len(config.experiment.testing_stocks)):
|
|
run.log({f"testing stock{i}": config.experiment.testing_stocks[i]}, commit=False)
|
|
run.log({"sampling_type": config.experiment.sampling_type}, commit=False)
|
|
if config.experiment.sampling_type == "time":
|
|
run.log({"sampling_time": config.experiment.sampling_time}, commit=False)
|
|
else:
|
|
run.log({"sampling_quantity": config.experiment.sampling_quantity}, commit=False)
|
|
train(config, trainer, run)
|
|
run.finish()
|
|
|
|
return wandb_sweep_callback
|
|
|
|
|
|
def sweep_init(config: Config):
|
|
|
|
wandb.login()
|
|
parameters = {}
|
|
for key in config.model.hyperparameters_sweep.keys():
|
|
parameters[key] = {'values': list(config.model.hyperparameters_sweep[key])}
|
|
sweep_config = {
|
|
'method': 'grid',
|
|
'metric': {
|
|
'goal': 'minimize',
|
|
'name': 'val_loss'
|
|
},
|
|
'early_terminate': {
|
|
'type': 'hyperband',
|
|
'min_iter': 3,
|
|
'eta': 1.5
|
|
},
|
|
'run_cap': 100,
|
|
'parameters': {**parameters}
|
|
}
|
|
return sweep_config
|
|
|
|
|
|
def print_setup(config: Config):
|
|
print("Model type: ", config.model.type)
|
|
print("Dataset: ", config.experiment.dataset_type)
|
|
print("Seed: ", config.experiment.seed)
|
|
print("Sequence size: ", config.model.hyperparameters_fixed["seq_size"])
|
|
print("Horizon: ", config.experiment.horizon)
|
|
print("All features: ", config.model.hyperparameters_fixed["all_features"])
|
|
print("Is data preprocessed: ", config.experiment.is_data_preprocessed)
|
|
print("Is wandb: ", config.experiment.is_wandb)
|
|
print("Is sweep: ", config.experiment.is_sweep)
|
|
print(config.experiment.type)
|
|
print("Is debug: ", config.experiment.is_debug)
|
|
if config.experiment.dataset_type == cst.Dataset.LOBSTER:
|
|
print("Training stocks: ", config.experiment.training_stocks)
|
|
print("Testing stocks: ", config.experiment.testing_stocks)
|
|
|
|
|