File size: 4,713 Bytes
6dd3ebe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
# https://github.com/gokulkarthik/LSTNet.pytorch/blob/master/LSTNet.py
class LSTNet(nn.Module):
def __init__(
self,
num_features: int = 8,
conv1_out_channels: int = 32,
conv1_kernel_height: int = 7,
recc1_out_channels: int = 64,
skip_steps: list[int] = [4,24],
skip_reccs_out_channels: list[int] = [4,4],
output_out_features: int = 1,
ar_window_size: int = 7,
dropout: float = 0.1
):
super(LSTNet, self).__init__()
self.num_features = num_features
self.conv1_out_channels = conv1_out_channels
self.conv1_kernel_height = conv1_kernel_height
self.recc1_out_channels = recc1_out_channels
self.skip_steps = skip_steps
self.skip_reccs_out_channels = skip_reccs_out_channels
self.output_out_features = output_out_features
self.ar_window_size = ar_window_size
self.dropout = nn.Dropout(p = dropout)
self.conv1 = nn.Conv2d(1, self.conv1_out_channels,
kernel_size=(self.conv1_kernel_height, self.num_features))
self.recc1 = nn.GRU(self.conv1_out_channels, self.recc1_out_channels, batch_first=True)
self.skip_reccs = nn.ModuleList()
for i in range(len(self.skip_steps)):
self.skip_reccs.append(nn.GRU(self.conv1_out_channels, self.skip_reccs_out_channels[i], batch_first=True))
self.output_in_features = self.recc1_out_channels + np.dot(self.skip_steps, self.skip_reccs_out_channels)
self.output = nn.Linear(self.output_in_features, self.output_out_features)
if self.ar_window_size > 0:
self.ar = nn.Linear(self.ar_window_size, 1)
def forward(self, X, fut_time):
"""
Parameters:
X (tensor) [batch_size, time_steps, num_features]
"""
batch_size = X.size(0)
# Convolutional Layer
C = X.unsqueeze(1) # [batch_size, num_channels=1, time_steps, num_features]
C = F.relu(self.conv1(C)) # [batch_size, conv1_out_channels, shrinked_time_steps, 1]
C = self.dropout(C)
C = torch.squeeze(C, 3) # [batch_size, conv1_out_channels, shrinked_time_steps]
# Recurrent Layer
R = C.permute(0, 2, 1) # [batch_size, shrinked_time_steps, conv1_out_channels]
out, hidden = self.recc1(R) # [batch_size, shrinked_time_steps, recc_out_channels]
R = out[:, -1, :] # [batch_size, recc_out_channels]
R = self.dropout(R)
#print(R.shape)
# Skip Recurrent Layers
shrinked_time_steps = C.size(2)
for i in range(len(self.skip_steps)):
skip_step = self.skip_steps[i]
skip_sequence_len = shrinked_time_steps // skip_step
# shrinked_time_steps shrinked further
S = C[:, :, -skip_sequence_len*skip_step:] # [batch_size, conv1_out_channels, shrinked_time_steps]
S = S.view(S.size(0), S.size(1), skip_sequence_len, skip_step) # [batch_size, conv1_out_channels, skip_sequence_len, skip_step=num_skip_components]
# note that num_skip_components = skip_step
S = S.permute(0, 3, 2, 1).contiguous() # [batch_size, skip_step=num_skip_components, skip_sequence_len, conv1_out_channels]
S = S.view(S.size(0)*S.size(1), S.size(2), S.size(3)) # [batch_size*num_skip_components, skip_sequence_len, conv1_out_channels]
out, hidden = self.skip_reccs[i](S) # [batch_size*num_skip_components, skip_sequence_len, skip_reccs_out_channels[i]]
S = out[:, -1, :] # [batch_size*num_skip_components, skip_reccs_out_channels[i]]
S = S.view(batch_size, skip_step*S.size(1)) # [batch_size, num_skip_components*skip_reccs_out_channels[i]]
S = self.dropout(S)
R = torch.cat((R, S), 1) # [batch_size, recc_out_channels + skip_reccs_out_channels * num_skip_components]
#print(S.shape)
#print(R.shape)
# Output Layer
O = F.relu(self.output(R)) # [batch_size, output_out_features=1]
if self.ar_window_size > 0:
# set dim3 based on output_out_features
AR = X[:, -self.ar_window_size:, 3:4] # [batch_size, ar_window_size, output_out_features=1]
AR = AR.permute(0, 2, 1).contiguous() # [batch_size, output_out_features, ar_window_size]
AR = self.ar(AR) # [batch_size, output_out_features, 1]
AR = AR.squeeze(2) # [batch_size, output_out_features]
O = O + AR
return O |