File size: 6,657 Bytes
bcf646b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
import torch
import random
import os
import sys
import logging
import numpy as np
import pandas as pd
from shutil import copy
from datetime import datetime
import matplotlib.pyplot as plt
import collections
import umap
import umap.plot
from matplotlib.colors import ListedColormap
from sklearn.metrics import classification_report, accuracy_score
def count_parameters(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self):
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def fix_randomness(SEED):
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def _logger(logger_name, level=logging.DEBUG):
"""
Method to return a custom logger with the given name and level
:param logger_name:
:param level:
:return:
"""
logger = logging.getLogger(logger_name)
logger.setLevel(level)
format_string = "%(message)s"
log_format = logging.Formatter(format_string)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(log_format)
logger.addHandler(console_handler)
# Creating and adding the file handler
file_handler = logging.FileHandler(logger_name, mode='a')
file_handler.setFormatter(log_format)
logger.addHandler(file_handler)
return logger
def starting_logs(data_type, exp_log_dir, seed_id):
log_dir = os.path.join(exp_log_dir, "_seed_"+ str(seed_id))
os.makedirs(log_dir,exist_ok=True)
log_file_name = os.path.join(log_dir, f"logs_{datetime.now().strftime('%d_%m_%Y_%H_%M_%S')}.log")
logger = _logger(log_file_name)
logger.debug('=' * 45)
logger.debug(f'Logging data type {data_type}')
logger.debug("=" * 45)
logger.debug(f'Logging seed id {seed_id}')
logger.debug("=" * 45)
return logger, log_dir
def save_checkpoint(exp_log_dir, model, dataset, dataset_configs, hparams, status):
save_dict = {
"dataset": dataset,
"configs": dataset_configs.__dict__,
"hparams": dict(hparams),
"model": model.state_dict(),
}
# save classification report
save_path = os.path.join(exp_log_dir, f"checkpoint_{status}.pt")
torch.save(save_dict, save_path)
def _calc_metrics(pred_labels, true_labels, classes_names):
pred_labels = np.array(pred_labels).astype(int)
true_labels = np.array(true_labels).astype(int)
r = classification_report(true_labels, pred_labels, target_names=classes_names, digits=6, output_dict=True)
accuracy = accuracy_score(true_labels, pred_labels)
return accuracy * 100, r["macro avg"]["f1-score"] * 100
def _save_metrics(pred_labels, true_labels, log_dir, status):
pred_labels = np.array(pred_labels).astype(int)
true_labels = np.array(true_labels).astype(int)
r = classification_report(true_labels, pred_labels, digits=6, output_dict=True)
df = pd.DataFrame(r)
accuracy = accuracy_score(true_labels, pred_labels)
df["accuracy"] = accuracy
df = df * 100
# save classification report
file_name = f"classification_report_{status}.xlsx"
report_save_path = os.path.join(log_dir, file_name)
df.to_excel(report_save_path)
def to_device(input, device):
if torch.is_tensor(input):
return input.to(device=device)
elif isinstance(input, str):
return input
elif isinstance(input, collections.abc.Mapping):
return {k: to_device(sample, device=device) for k, sample in input.items()}
elif isinstance(input, collections.abc.Sequence):
return [to_device(sample, device=device) for sample in input]
else:
raise TypeError("Input must contain tensor, dict or list, found {type(input)}")
def copy_files(destination):
destination_dir = os.path.join(destination, "MODEL_BACKUP_FILES")
os.makedirs(destination_dir, exist_ok=True)
copy("main.py", os.path.join(destination_dir, "main.py"))
copy("data/dataloader.py", os.path.join(destination_dir, "dataloader.py"))
copy("data/dataset.py", os.path.join(destination_dir, "dataset.py"))
copy(f"models/MoE_ECGFormer.py", os.path.join(destination_dir, f"models.py"))
copy(f"configs/data_configs.py", os.path.join(destination_dir, f"data_configs.py"))
copy(f"configs/hparams.py", os.path.join(destination_dir, f"hparams.py"))
copy(f"train.py", os.path.join(destination_dir, f"train.py"))
copy("utils.py", os.path.join(destination_dir, "utils.py"))
def _plot_umap(model, data_loader, device, save_dir):
classes_names = ['N', 'S', 'V', 'F', 'Q']
font = {'family': 'Times New Roman',
'weight': 'bold',
'size': 17}
plt.rc('font', **font)
with torch.no_grad():
# Source flow
data = data_loader.dataset.x_data.float().to(device)
labels = data_loader.dataset.y_data.view((-1)).long()
out = model[0](data)
features = model[1](out)
if not os.path.exists(os.path.join(save_dir, "umap_plots")):
os.mkdir(os.path.join(save_dir, "umap_plots"))
# cmaps = plt.get_cmap('jet')
model_reducer = umap.UMAP() # n_neighbors=3, min_dist=0.3, metric='correlation', random_state=42)
embedding = model_reducer.fit_transform(features.detach().cpu().numpy())
# Normalize the labels to [0, 1] for colormap
norm_labels = labels / 4.0
# Create a new colormap by extracting the first 5 colors from "Paired"
paired = plt.cm.get_cmap('Paired', 12) # 12 distinct colors
new_colors = [paired(0), paired(1), paired(2), paired(4),
paired(6)] # Skip every second color, but take both from the first pair
new_cmap = ListedColormap(new_colors)
print("Plotting UMAP ...")
plt.figure(figsize=(16, 10))
# scatter = plt.scatter(embedding[:, 0], embedding[:, 1], c=labels, s=10, cmap='Spectral')
scatter = plt.scatter(embedding[:, 0], embedding[:, 1], c=norm_labels, cmap=new_cmap, s=15)
handles, _ = scatter.legend_elements(prop='colors')
plt.legend(handles, classes_names, title="Classes")
file_name = "umap_.png"
fig_save_name = os.path.join(save_dir, "umap_plots", file_name)
plt.xticks([])
plt.yticks([])
plt.savefig(fig_save_name, bbox_inches='tight')
plt.close()
|