import collections import os import sys import numpy as np import torch import matplotlib.pyplot as plt import pandas as pd from torch.utils.data._utils.collate import np_str_obj_array_pattern, default_collate_err_msg_format plt.switch_backend('agg') def adjust_learning_rate(optimizer, epoch, args): # lr = args.learning_rate * (0.2 ** (epoch // 2)) if args.lradj == 'type1': lr_adjust = {epoch: args.learning_rate * (0.5 ** ((epoch - 1) // 1))} elif args.lradj == 'type2': lr_adjust = { 2: 5e-5, 4: 1e-5, 6: 5e-6, 8: 1e-6, 10: 5e-7, 15: 1e-7, 20: 5e-8 } if epoch in lr_adjust.keys(): lr = lr_adjust[epoch] for param_group in optimizer.param_groups: param_group['lr'] = lr print('Updating learning rate to {}'.format(lr)) class EarlyStopping: def __init__(self, patience=7, verbose=False, delta=0): self.patience = patience self.verbose = verbose self.counter = 0 self.best_score = None self.early_stop = False self.val_loss_min = np.Inf self.delta = delta def __call__(self, val_loss, model, path): score = -val_loss if self.best_score is None: self.best_score = score self.save_checkpoint(val_loss, model, path) elif score < self.best_score + self.delta: self.counter += 1 print(f'EarlyStopping counter: {self.counter} out of {self.patience}') if self.counter >= self.patience: self.early_stop = True else: self.best_score = score self.save_checkpoint(val_loss, model, path) self.counter = 0 def save_checkpoint(self, val_loss, model, path): if self.verbose: print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...') torch.save(model.state_dict(), path + '/' + 'checkpoint.pth') self.val_loss_min = val_loss class dotdict(dict): """dot.notation access to dictionary attributes""" __getattr__ = dict.get __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ class StandardScaler(): def __init__(self, mean, std): self.mean = mean self.std = std def transform(self, data): return (data - self.mean) / self.std def inverse_transform(self, data): return (data * self.std) + self.mean def visual(true, preds=None, name='./pic/test.pdf'): """ Results visualization """ plt.figure() plt.plot(true, label='GroundTruth', linewidth=2) if preds is not None: plt.plot(preds, label='Prediction', linewidth=2) plt.legend() plt.savefig(name, bbox_inches='tight') def adjustment(gt, pred): anomaly_state = False for i in range(len(gt)): if gt[i] == 1 and pred[i] == 1 and not anomaly_state: anomaly_state = True for j in range(i, 0, -1): if gt[j] == 0: break else: if pred[j] == 0: pred[j] = 1 for j in range(i, len(gt)): if gt[j] == 0: break else: if pred[j] == 0: pred[j] = 1 elif gt[i] == 0: anomaly_state = False if anomaly_state: pred[i] = 1 return gt, pred def cal_accuracy(y_pred, y_true): return np.mean(y_pred == y_true) def custom_collate(batch): r"""source: pytorch 1.9.0, only one modification to original code """ elem = batch[0] elem_type = type(elem) if isinstance(elem, torch.Tensor): out = None if torch.utils.data.get_worker_info() is not None: # If we're in a background process, concatenate directly into a # shared memory tensor to avoid an extra copy numel = sum([x.numel() for x in batch]) storage = elem.storage()._new_shared(numel) out = elem.new(storage) return torch.stack(batch, 0, out=out) elif elem_type.__module__ == 'numpy' and elem_type.__name__ != 'str_' \ and elem_type.__name__ != 'string_': if elem_type.__name__ == 'ndarray' or elem_type.__name__ == 'memmap': # array of string classes and object if np_str_obj_array_pattern.search(elem.dtype.str) is not None: raise TypeError(default_collate_err_msg_format.format(elem.dtype)) return custom_collate([torch.as_tensor(b) for b in batch]) elif elem.shape == (): # scalars return torch.as_tensor(batch) elif isinstance(elem, float): return torch.tensor(batch, dtype=torch.float64) elif isinstance(elem, int): return torch.tensor(batch) elif isinstance(elem, str): return batch elif isinstance(elem, collections.abc.Mapping): return {key: custom_collate([d[key] for d in batch]) for key in elem} elif isinstance(elem, tuple) and hasattr(elem, '_fields'): # namedtuple return elem_type(*(custom_collate(samples) for samples in zip(*batch))) elif isinstance(elem, collections.abc.Sequence): # check to make sure that the elements in batch have consistent size it = iter(batch) elem_size = len(next(it)) if not all(len(elem) == elem_size for elem in it): raise RuntimeError('each element in list of batch should be of equal size') transposed = zip(*batch) return [custom_collate(samples) for samples in transposed] raise TypeError(default_collate_err_msg_format.format(elem_type)) class HiddenPrints: def __init__(self, rank): # 如果rank是none,那么就是单机单卡,不需要隐藏打印,将rank设置为0 if rank is None: rank = 0 self.rank = rank def __enter__(self): if self.rank == 0: return self._original_stdout = sys.stdout sys.stdout = open(os.devnull, 'w') def __exit__(self, exc_type, exc_val, exc_tb): if self.rank == 0: return sys.stdout.close() sys.stdout = self._original_stdout