Spaces:
Sleeping
Sleeping
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| from layers.Embed import DataEmbedding, TemporalEmbedding | |
| from torch import Tensor | |
| from typing import Optional | |
| from collections import namedtuple | |
| # static: time-independent features | |
| # observed: time features of the past(e.g. predicted targets) | |
| # known: known information about the past and future(i.e. time stamp) | |
| TypePos = namedtuple('TypePos', ['static', 'observed']) | |
| # When you want to use new dataset, please add the index of 'static, observed' columns here. | |
| # 'known' columns needn't be added, because 'known' inputs are automatically judged and provided by the program. | |
| datatype_dict = {'ETTh1': TypePos([], [x for x in range(7)]), | |
| 'ETTm1': TypePos([], [x for x in range(7)])} | |
| def get_known_len(embed_type, freq): | |
| if embed_type != 'timeF': | |
| if freq == 't': | |
| return 5 | |
| else: | |
| return 4 | |
| else: | |
| freq_map = {'h': 4, 't': 5, 's': 6, | |
| 'm': 1, 'a': 1, 'w': 2, 'd': 3, 'b': 3} | |
| return freq_map[freq] | |
| class TFTTemporalEmbedding(TemporalEmbedding): | |
| def __init__(self, d_model, embed_type='fixed', freq='h'): | |
| super(TFTTemporalEmbedding, self).__init__(d_model, embed_type, freq) | |
| def forward(self, x): | |
| x = x.long() | |
| minute_x = self.minute_embed(x[:, :, 4]) if hasattr( | |
| self, 'minute_embed') else 0. | |
| hour_x = self.hour_embed(x[:, :, 3]) | |
| weekday_x = self.weekday_embed(x[:, :, 2]) | |
| day_x = self.day_embed(x[:, :, 1]) | |
| month_x = self.month_embed(x[:, :, 0]) | |
| embedding_x = torch.stack([month_x, day_x, weekday_x, hour_x, minute_x], dim=-2) if hasattr( | |
| self, 'minute_embed') else torch.stack([month_x, day_x, weekday_x, hour_x], dim=-2) | |
| return embedding_x | |
| class TFTTimeFeatureEmbedding(nn.Module): | |
| def __init__(self, d_model, embed_type='timeF', freq='h'): | |
| super(TFTTimeFeatureEmbedding, self).__init__() | |
| d_inp = get_known_len(embed_type, freq) | |
| self.embed = nn.ModuleList([nn.Linear(1, d_model, bias=False) for _ in range(d_inp)]) | |
| def forward(self, x): | |
| return torch.stack([embed(x[:,:,i].unsqueeze(-1)) for i, embed in enumerate(self.embed)], dim=-2) | |
| class TFTEmbedding(nn.Module): | |
| def __init__(self, configs): | |
| super(TFTEmbedding, self).__init__() | |
| self.pred_len = configs.pred_len | |
| self.static_pos = datatype_dict[configs.data].static | |
| self.observed_pos = datatype_dict[configs.data].observed | |
| self.static_len = len(self.static_pos) | |
| self.observed_len = len(self.observed_pos) | |
| self.static_embedding = nn.ModuleList([DataEmbedding(1,configs.d_model,dropout=configs.dropout) for _ in range(self.static_len)]) \ | |
| if self.static_len else None | |
| self.observed_embedding = nn.ModuleList([DataEmbedding(1,configs.d_model,dropout=configs.dropout) for _ in range(self.observed_len)]) | |
| self.known_embedding = TFTTemporalEmbedding(configs.d_model, configs.embed, configs.freq) \ | |
| if configs.embed != 'timeF' else TFTTimeFeatureEmbedding(configs.d_model, configs.embed, configs.freq) | |
| def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec): | |
| if self.static_len: | |
| # static_input: [B,C,d_model] | |
| static_input = torch.stack([embed(x_enc[:,:1,self.static_pos[i]].unsqueeze(-1), None).squeeze(1) for i, embed in enumerate(self.static_embedding)], dim=-2) | |
| else: | |
| static_input = None | |
| # observed_input: [B,T,C,d_model] | |
| observed_input = torch.stack([embed(x_enc[:,:,self.observed_pos[i]].unsqueeze(-1), None) for i, embed in enumerate(self.observed_embedding)], dim=-2) | |
| x_mark = torch.cat([x_mark_enc, x_mark_dec[:,-self.pred_len:,:]], dim=-2) | |
| # known_input: [B,T,C,d_model] | |
| known_input = self.known_embedding(x_mark) | |
| return static_input, observed_input, known_input | |
| class GLU(nn.Module): | |
| def __init__(self, input_size, output_size): | |
| super().__init__() | |
| self.fc1 = nn.Linear(input_size, output_size) | |
| self.fc2 = nn.Linear(input_size, output_size) | |
| self.glu = nn.GLU() | |
| def forward(self, x): | |
| a = self.fc1(x) | |
| b = self.fc2(x) | |
| return self.glu(torch.cat([a, b], dim=-1)) | |
| class GateAddNorm(nn.Module): | |
| def __init__(self, input_size, output_size): | |
| super(GateAddNorm, self).__init__() | |
| self.glu = GLU(input_size, input_size) | |
| self.projection = nn.Linear(input_size, output_size) if input_size != output_size else nn.Identity() | |
| self.layer_norm = nn.LayerNorm(output_size) | |
| def forward(self, x, skip_a): | |
| x = self.glu(x) | |
| x = x + skip_a | |
| return self.layer_norm(self.projection(x)) | |
| class GRN(nn.Module): | |
| def __init__(self, input_size, output_size, hidden_size=None, context_size=None, dropout=0.0): | |
| super(GRN, self).__init__() | |
| hidden_size = input_size if hidden_size is None else hidden_size | |
| self.lin_a = nn.Linear(input_size, hidden_size) | |
| self.lin_c = nn.Linear(context_size, hidden_size) if context_size is not None else None | |
| self.lin_i = nn.Linear(hidden_size, hidden_size) | |
| self.dropout = nn.Dropout(dropout) | |
| self.project_a = nn.Linear(input_size, hidden_size) if hidden_size != input_size else nn.Identity() | |
| self.gate = GateAddNorm(hidden_size, output_size) | |
| def forward(self, a: Tensor, c: Optional[Tensor] = None): | |
| # a: [B,T,d], c: [B,d] | |
| x = self.lin_a(a) | |
| if c is not None: | |
| x = x + self.lin_c(c).unsqueeze(1) | |
| x = F.elu(x) | |
| x = self.lin_i(x) | |
| x = self.dropout(x) | |
| return self.gate(x, self.project_a(a)) | |
| class VariableSelectionNetwork(nn.Module): | |
| def __init__(self, d_model, variable_num, dropout=0.0): | |
| super(VariableSelectionNetwork, self).__init__() | |
| self.joint_grn = GRN(d_model * variable_num, variable_num, hidden_size=d_model, context_size=d_model, dropout=dropout) | |
| self.variable_grns = nn.ModuleList([GRN(d_model, d_model, dropout=dropout) for _ in range(variable_num)]) | |
| def forward(self, x: Tensor, context: Optional[Tensor] = None): | |
| # x: [B,T,C,d] or [B,C,d] | |
| # selection_weights: [B,T,C] or [B,C] | |
| # x_processed: [B,T,d,C] or [B,d,C] | |
| # selection_result: [B,T,d] or [B,d] | |
| x_flattened = torch.flatten(x, start_dim=-2) | |
| selection_weights = self.joint_grn(x_flattened, context) | |
| selection_weights = F.softmax(selection_weights, dim=-1) | |
| x_processed = torch.stack([grn(x[...,i,:]) for i, grn in enumerate(self.variable_grns)], dim=-1) | |
| selection_result = torch.matmul(x_processed, selection_weights.unsqueeze(-1)).squeeze(-1) | |
| return selection_result | |
| class StaticCovariateEncoder(nn.Module): | |
| def __init__(self, d_model, static_len, dropout=0.0): | |
| super(StaticCovariateEncoder, self).__init__() | |
| self.static_vsn = VariableSelectionNetwork(d_model, static_len) if static_len else None | |
| self.grns = nn.ModuleList([GRN(d_model, d_model, dropout=dropout) for _ in range(4)]) | |
| def forward(self, static_input): | |
| # static_input: [B,C,d] | |
| if static_input is not None: | |
| static_features = self.static_vsn(static_input) | |
| return [grn(static_features) for grn in self.grns] | |
| else: | |
| return [None] * 4 | |
| class InterpretableMultiHeadAttention(nn.Module): | |
| def __init__(self, configs): | |
| super(InterpretableMultiHeadAttention, self).__init__() | |
| self.n_heads = configs.n_heads | |
| assert configs.d_model % configs.n_heads == 0 | |
| self.d_head = configs.d_model // configs.n_heads | |
| self.qkv_linears = nn.Linear(configs.d_model, (2 * self.n_heads + 1) * self.d_head, bias=False) | |
| self.out_projection = nn.Linear(self.d_head, configs.d_model, bias=False) | |
| self.out_dropout = nn.Dropout(configs.dropout) | |
| self.scale = self.d_head ** -0.5 | |
| example_len = configs.seq_len + configs.pred_len | |
| self.register_buffer("mask", torch.triu(torch.full((example_len, example_len), float('-inf')), 1)) | |
| def forward(self, x): | |
| # Q,K,V are all from x | |
| B, T, d_model = x.shape | |
| qkv = self.qkv_linears(x) | |
| q, k, v = qkv.split((self.n_heads * self.d_head, self.n_heads * self.d_head, self.d_head), dim=-1) | |
| q = q.view(B, T, self.n_heads, self.d_head) | |
| k = k.view(B, T, self.n_heads, self.d_head) | |
| v = v.view(B, T, self.d_head) | |
| attention_score = torch.matmul(q.permute((0, 2, 1, 3)), k.permute((0, 2, 3, 1))) # [B,n,T,T] | |
| attention_score.mul_(self.scale) | |
| attention_score = attention_score + self.mask | |
| attention_prob = F.softmax(attention_score, dim=3) # [B,n,T,T] | |
| attention_out = torch.matmul(attention_prob, v.unsqueeze(1)) # [B,n,T,d] | |
| attention_out = torch.mean(attention_out, dim=1) # [B,T,d] | |
| out = self.out_projection(attention_out) | |
| out = self.out_dropout(out) # [B,T,d] | |
| return out | |
| class TemporalFusionDecoder(nn.Module): | |
| def __init__(self, configs): | |
| super(TemporalFusionDecoder, self).__init__() | |
| self.pred_len = configs.pred_len | |
| self.history_encoder = nn.LSTM(configs.d_model, configs.d_model, batch_first=True) | |
| self.future_encoder = nn.LSTM(configs.d_model, configs.d_model, batch_first=True) | |
| self.gate_after_lstm = GateAddNorm(configs.d_model, configs.d_model) | |
| self.enrichment_grn = GRN(configs.d_model, configs.d_model, context_size=configs.d_model, dropout=configs.dropout) | |
| self.attention = InterpretableMultiHeadAttention(configs) | |
| self.gate_after_attention = GateAddNorm(configs.d_model, configs.d_model) | |
| self.position_wise_grn = GRN(configs.d_model, configs.d_model, dropout=configs.dropout) | |
| self.gate_final = GateAddNorm(configs.d_model, configs.d_model) | |
| self.out_projection = nn.Linear(configs.d_model, configs.c_out) | |
| def forward(self, history_input, future_input, c_c, c_h, c_e): | |
| # history_input, future_input: [B,T,d] | |
| # c_c, c_h, c_e: [B,d] | |
| # LSTM | |
| c = (c_c.unsqueeze(0), c_h.unsqueeze(0)) if c_c is not None and c_h is not None else None | |
| historical_features, state = self.history_encoder(history_input, c) | |
| future_features, _ = self.future_encoder(future_input, state) | |
| # Skip connection | |
| temporal_input = torch.cat([history_input, future_input], dim=1) | |
| temporal_features = torch.cat([historical_features, future_features], dim=1) | |
| temporal_features = self.gate_after_lstm(temporal_features, temporal_input) # [B,T,d] | |
| # Static enrichment | |
| enriched_features = self.enrichment_grn(temporal_features, c_e) # [B,T,d] | |
| # Temporal self-attention | |
| attention_out = self.attention(enriched_features) # [B,T,d] | |
| # Don't compute historical loss | |
| attention_out = self.gate_after_attention(attention_out[:,-self.pred_len:], enriched_features[:,-self.pred_len:]) | |
| # Position-wise feed-forward | |
| out = self.position_wise_grn(attention_out) # [B,T,d] | |
| # Final skip connection | |
| out = self.gate_final(out, temporal_features[:,-self.pred_len:]) | |
| return self.out_projection(out) | |
| class Model(nn.Module): | |
| def __init__(self, configs): | |
| super(Model, self).__init__() | |
| self.configs = configs | |
| self.task_name = configs.task_name | |
| self.seq_len = configs.seq_len | |
| self.label_len = configs.label_len | |
| self.pred_len = configs.pred_len | |
| # Number of variables | |
| self.static_len = len(datatype_dict[configs.data].static) | |
| self.observed_len = len(datatype_dict[configs.data].observed) | |
| self.known_len = get_known_len(configs.embed, configs.freq) | |
| self.embedding = TFTEmbedding(configs) | |
| self.static_encoder = StaticCovariateEncoder(configs.d_model, self.static_len) | |
| self.history_vsn = VariableSelectionNetwork(configs.d_model, self.observed_len + self.known_len) | |
| self.future_vsn = VariableSelectionNetwork(configs.d_model, self.known_len) | |
| self.temporal_fusion_decoder = TemporalFusionDecoder(configs) | |
| def forecast(self, x_enc, x_mark_enc, x_dec, x_mark_dec): | |
| # Normalization from Non-stationary Transformer | |
| means = x_enc.mean(1, keepdim=True).detach() | |
| x_enc = x_enc - means | |
| stdev = torch.sqrt(torch.var(x_enc, dim=1, keepdim=True, unbiased=False) + 1e-5) | |
| x_enc /= stdev | |
| # Data embedding | |
| # static_input: [B,C,d], observed_input:[B,T,C,d], known_input: [B,T,C,d] | |
| static_input, observed_input, known_input = self.embedding(x_enc, x_mark_enc, x_dec, x_mark_dec) | |
| # Static context | |
| # c_s,...,c_e: [B,d] | |
| c_s, c_c, c_h, c_e = self.static_encoder(static_input) | |
| # Temporal input Selection | |
| history_input = torch.cat([observed_input, known_input[:,:self.seq_len]], dim=-2) | |
| future_input = known_input[:,self.seq_len:] | |
| history_input = self.history_vsn(history_input, c_s) | |
| future_input = self.future_vsn(future_input, c_s) | |
| # TFT main procedure after variable selection | |
| # history_input: [B,T,d], future_input: [B,T,d] | |
| dec_out = self.temporal_fusion_decoder(history_input, future_input, c_c, c_h, c_e) | |
| # De-Normalization from Non-stationary Transformer | |
| dec_out = dec_out * (stdev[:, 0, :].unsqueeze(1).repeat(1, self.pred_len, 1)) | |
| dec_out = dec_out + (means[:, 0, :].unsqueeze(1).repeat(1, self.pred_len, 1)) | |
| return dec_out | |
| def forward(self, x_enc, x_mark_enc, x_dec, x_mark_dec): | |
| if self.task_name == 'long_term_forecast' or self.task_name == 'short_term_forecast': | |
| dec_out = self.forecast(x_enc, x_mark_enc, x_dec, x_mark_dec) # [B,pred_len,C] | |
| dec_out = torch.cat([torch.zeros_like(x_enc), dec_out], dim=1) | |
| return dec_out # [B, T, D] | |
| return None |