Spaces:
Sleeping
Sleeping
| """ | |
| This function is adapted from [FITS] by [VEWOXIC] | |
| Original source: [https://github.com/VEWOXIC/FITS] | |
| """ | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import numpy as np | |
| from typing import Dict | |
| import torchinfo | |
| import tqdm | |
| import numpy as np | |
| import torch | |
| from torch import nn, optim | |
| from torch.utils.data import DataLoader | |
| import math | |
| from ..utils.torch_utility import EarlyStoppingTorch, get_gpu | |
| from ..utils.dataset import ReconstructDataset | |
| class Model(nn.Module): | |
| # FITS: Frequency Interpolation Time Series Forecasting | |
| def __init__(self, seq_len, pred_len, individual, enc_in, cut_freq): | |
| super(Model, self).__init__() | |
| self.seq_len = seq_len | |
| self.pred_len = pred_len | |
| self.individual = individual | |
| self.channels = enc_in | |
| self.dominance_freq = cut_freq # 720/24 | |
| self.length_ratio = (self.seq_len + self.pred_len)/self.seq_len | |
| if self.individual: | |
| self.freq_upsampler = nn.ModuleList() | |
| for i in range(self.channels): | |
| self.freq_upsampler.append(nn.Linear(self.dominance_freq, int(self.dominance_freq*self.length_ratio)).to(torch.cfloat)) | |
| else: | |
| self.freq_upsampler = nn.Linear(self.dominance_freq, int(self.dominance_freq*self.length_ratio)).to(torch.cfloat) # complex layer for frequency upcampling] | |
| # configs.pred_len=configs.seq_len+configs.pred_len | |
| # #self.Dlinear=DLinear.Model(configs) | |
| # configs.pred_len=self.pred_len | |
| def forward(self, x): | |
| # RIN | |
| x_mean = torch.mean(x, dim=1, keepdim=True) | |
| x = x - x_mean | |
| x_var=torch.var(x, dim=1, keepdim=True)+ 1e-5 | |
| # print(x_var) | |
| x = x / torch.sqrt(x_var) | |
| low_specx = torch.fft.rfft(x, dim=1) | |
| low_specx[:,self.dominance_freq:]=0 # LPF | |
| low_specx = low_specx[:,0:self.dominance_freq,:] # LPF | |
| # print(low_specx.permute(0,2,1)) | |
| if self.individual: | |
| low_specxy_ = torch.zeros([low_specx.size(0),int(self.dominance_freq*self.length_ratio),low_specx.size(2)],dtype=low_specx.dtype).to(low_specx.device) | |
| for i in range(self.channels): | |
| low_specxy_[:,:,i]=self.freq_upsampler[i](low_specx[:,:,i].permute(0,1)).permute(0,1) | |
| else: | |
| low_specxy_ = self.freq_upsampler(low_specx.permute(0,2,1)).permute(0,2,1) | |
| # print(low_specxy_) | |
| low_specxy = torch.zeros([low_specxy_.size(0),int((self.seq_len+self.pred_len)/2+1),low_specxy_.size(2)],dtype=low_specxy_.dtype).to(low_specxy_.device) | |
| low_specxy[:,0:low_specxy_.size(1),:]=low_specxy_ # zero padding | |
| low_xy=torch.fft.irfft(low_specxy, dim=1) | |
| low_xy=low_xy * self.length_ratio # energy compemsation for the length change | |
| # dom_x=x-low_x | |
| # dom_xy=self.Dlinear(dom_x) | |
| # xy=(low_xy+dom_xy) * torch.sqrt(x_var) +x_mean # REVERSE RIN | |
| xy=(low_xy) * torch.sqrt(x_var) +x_mean | |
| return xy, low_xy* torch.sqrt(x_var) | |
| class FITS(): | |
| def __init__(self, | |
| win_size=100, | |
| DSR=4, | |
| individual=True, | |
| input_c=1, | |
| batch_size=128, | |
| cut_freq=12, | |
| epochs=50, | |
| lr=1e-3, | |
| validation_size=0.2 | |
| ): | |
| super().__init__() | |
| self.__anomaly_score = None | |
| self.cuda = True | |
| self.device = get_gpu(self.cuda) | |
| self.win_size = win_size | |
| self.DSR = DSR | |
| self.individual = individual | |
| self.input_c = input_c | |
| self.batch_size = batch_size | |
| self.cut_freq = cut_freq | |
| self.validation_size = validation_size | |
| self.model = Model(seq_len=self.win_size//self.DSR, pred_len=self.win_size-self.win_size//self.DSR, individual=self.individual, enc_in=self.input_c, cut_freq=self.cut_freq).to(self.device) | |
| self.epochs = epochs | |
| self.learning_rate = lr | |
| self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate) | |
| self.scheduler = optim.lr_scheduler.StepLR(self.optimizer, step_size=5, gamma=0.75) | |
| self.loss = nn.MSELoss() | |
| self.anomaly_criterion = nn.MSELoss(reduce=False) | |
| self.save_path = None | |
| self.early_stopping = EarlyStoppingTorch(save_path=self.save_path, patience=3) | |
| def fit(self, data): | |
| tsTrain = data[:int((1-self.validation_size)*len(data))] | |
| tsValid = data[int((1-self.validation_size)*len(data)):] | |
| train_loader = DataLoader( | |
| dataset=ReconstructDataset(tsTrain, window_size=self.win_size), | |
| batch_size=self.batch_size, | |
| shuffle=True | |
| ) | |
| valid_loader = DataLoader( | |
| dataset=ReconstructDataset(tsValid, window_size=self.win_size), | |
| batch_size=self.batch_size, | |
| shuffle=False | |
| ) | |
| for epoch in range(1, self.epochs + 1): | |
| self.model.train(mode=True) | |
| avg_loss = 0 | |
| loop = tqdm.tqdm(enumerate(train_loader),total=len(train_loader),leave=True) | |
| for idx, (x, target) in loop: | |
| x = x[:, ::self.DSR, :] | |
| x, target = x.to(self.device), target.to(self.device) | |
| self.optimizer.zero_grad() | |
| output, _ = self.model(x) | |
| # print('x: ', x.shape) | |
| # print('target: ', target.shape) | |
| loss = self.loss(output, target) | |
| loss.backward() | |
| self.optimizer.step() | |
| avg_loss += loss.cpu().item() | |
| loop.set_description(f'Training Epoch [{epoch}/{self.epochs}]') | |
| loop.set_postfix(loss=loss.item(), avg_loss=avg_loss/(idx+1)) | |
| self.model.eval() | |
| avg_loss = 0 | |
| loop = tqdm.tqdm(enumerate(valid_loader),total=len(valid_loader),leave=True) | |
| with torch.no_grad(): | |
| for idx, (x, target) in loop: | |
| x = x[:, ::self.DSR, :] | |
| x, target = x.to(self.device), target.to(self.device) | |
| output, _ = self.model(x) | |
| loss = self.loss(output, target) | |
| avg_loss += loss.cpu().item() | |
| loop.set_description(f'Validation Epoch [{epoch}/{self.epochs}]') | |
| loop.set_postfix(loss=loss.item(), avg_loss=avg_loss/(idx+1)) | |
| valid_loss = avg_loss/max(len(valid_loader), 1) | |
| self.scheduler.step() | |
| self.early_stopping(valid_loss, self.model) | |
| if self.early_stopping.early_stop: | |
| print(" Early stopping<<<") | |
| break | |
| def decision_function(self, data): | |
| test_loader = DataLoader( | |
| dataset=ReconstructDataset(data, window_size=self.win_size), | |
| batch_size=self.batch_size, | |
| shuffle=False | |
| ) | |
| self.model.eval() | |
| scores = [] | |
| loop = tqdm.tqdm(enumerate(test_loader),total=len(test_loader),leave=True) | |
| with torch.no_grad(): | |
| for idx, (x, target) in loop: | |
| x = x[:, ::self.DSR, :] | |
| x, target = x.to(self.device), target.to(self.device) | |
| output, _ = self.model(x) | |
| # loss = self.loss(output, target) | |
| score = torch.mean(self.anomaly_criterion(output, target), dim=-1) | |
| scores.append(score.cpu()[:,-1]) | |
| loop.set_description(f'Testing: ') | |
| scores = torch.cat(scores, dim=0) | |
| scores = scores.numpy().flatten() | |
| assert scores.ndim == 1 | |
| self.__anomaly_score = scores | |
| if self.__anomaly_score.shape[0] < len(data): | |
| self.__anomaly_score = np.array([self.__anomaly_score[0]]*math.ceil((self.win_size-1)/2) + | |
| list(self.__anomaly_score) + [self.__anomaly_score[-1]]*((self.win_size-1)//2)) | |
| return self.__anomaly_score | |
| def anomaly_score(self) -> np.ndarray: | |
| return self.__anomaly_score | |
| def param_statistic(self, save_file): | |
| model_stats = torchinfo.summary(self.model, (self.batch_size, self.input_len), verbose=0) | |
| with open(save_file, 'w') as f: | |
| f.write(str(model_stats)) |