|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
import numpy as np |
|
|
|
|
|
|
|
class LSTNet(nn.Module): |
|
|
|
def __init__( |
|
self, |
|
num_features: int = 8, |
|
conv1_out_channels: int = 32, |
|
conv1_kernel_height: int = 7, |
|
recc1_out_channels: int = 64, |
|
skip_steps: list[int] = [4,24], |
|
skip_reccs_out_channels: list[int] = [4,4], |
|
output_out_features: int = 1, |
|
ar_window_size: int = 7, |
|
dropout: float = 0.1 |
|
): |
|
super(LSTNet, self).__init__() |
|
self.num_features = num_features |
|
self.conv1_out_channels = conv1_out_channels |
|
self.conv1_kernel_height = conv1_kernel_height |
|
self.recc1_out_channels = recc1_out_channels |
|
self.skip_steps = skip_steps |
|
self.skip_reccs_out_channels = skip_reccs_out_channels |
|
self.output_out_features = output_out_features |
|
self.ar_window_size = ar_window_size |
|
self.dropout = nn.Dropout(p = dropout) |
|
|
|
|
|
self.conv1 = nn.Conv2d(1, self.conv1_out_channels, |
|
kernel_size=(self.conv1_kernel_height, self.num_features)) |
|
self.recc1 = nn.GRU(self.conv1_out_channels, self.recc1_out_channels, batch_first=True) |
|
self.skip_reccs = nn.ModuleList() |
|
for i in range(len(self.skip_steps)): |
|
self.skip_reccs.append(nn.GRU(self.conv1_out_channels, self.skip_reccs_out_channels[i], batch_first=True)) |
|
self.output_in_features = self.recc1_out_channels + np.dot(self.skip_steps, self.skip_reccs_out_channels) |
|
self.output = nn.Linear(self.output_in_features, self.output_out_features) |
|
if self.ar_window_size > 0: |
|
self.ar = nn.Linear(self.ar_window_size, 1) |
|
|
|
def forward(self, X, fut_time): |
|
""" |
|
Parameters: |
|
X (tensor) [batch_size, time_steps, num_features] |
|
""" |
|
batch_size = X.size(0) |
|
|
|
|
|
C = X.unsqueeze(1) |
|
C = F.relu(self.conv1(C)) |
|
C = self.dropout(C) |
|
C = torch.squeeze(C, 3) |
|
|
|
|
|
R = C.permute(0, 2, 1) |
|
out, hidden = self.recc1(R) |
|
R = out[:, -1, :] |
|
R = self.dropout(R) |
|
|
|
|
|
|
|
shrinked_time_steps = C.size(2) |
|
for i in range(len(self.skip_steps)): |
|
skip_step = self.skip_steps[i] |
|
skip_sequence_len = shrinked_time_steps // skip_step |
|
|
|
S = C[:, :, -skip_sequence_len*skip_step:] |
|
S = S.view(S.size(0), S.size(1), skip_sequence_len, skip_step) |
|
|
|
S = S.permute(0, 3, 2, 1).contiguous() |
|
S = S.view(S.size(0)*S.size(1), S.size(2), S.size(3)) |
|
out, hidden = self.skip_reccs[i](S) |
|
S = out[:, -1, :] |
|
S = S.view(batch_size, skip_step*S.size(1)) |
|
S = self.dropout(S) |
|
R = torch.cat((R, S), 1) |
|
|
|
|
|
|
|
|
|
O = F.relu(self.output(R)) |
|
|
|
if self.ar_window_size > 0: |
|
|
|
AR = X[:, -self.ar_window_size:, 3:4] |
|
AR = AR.permute(0, 2, 1).contiguous() |
|
AR = self.ar(AR) |
|
AR = AR.squeeze(2) |
|
O = O + AR |
|
|
|
return O |