File size: 2,519 Bytes
9485251 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
import torch
import torch.nn as nn
class LstmModel(nn.Module):
def __init__(self, in_size: int, latent_size: int) -> None:
"""
Initialize the LSTM autoencoder model.
Parameters:
-----------
in_size : int
Number of features in the input (input dimension).
latent_size : int
Size of the latent space representation in the LSTM.
Example:
--------
For in_size = 5, latent_size = 50, the model will:
- Take inputs with 5 features
- Encode them into a 50-dimensional latent space
- Decode back to the original 5 feature dimensions
Architecture:
- LSTM layer for encoding with dropout 0.2
- Additional dropout layer (0.2)
- ReLU activation
- Fully connected layer for decoding
"""
super().__init__() # Corrected the position of super().__init__()
self.lstm = nn.LSTM(
input_size=in_size,
hidden_size=latent_size,
num_layers=1,
batch_first=True,
dropout=0.2
) # input and output tensors are provided as (batch, seq_len, feature(size))
self.dropout = nn.Dropout(0.2)
self.relu = nn.ReLU()
self.fc = nn.Linear(latent_size, in_size)
def forward(self, w: torch.Tensor) -> torch.Tensor:
"""
Forward pass through the LSTM model.
Parameters:
-----------
w : torch.Tensor
Input tensor of shape (batch_size, seq_len, in_size).
Returns:
--------
torch.Tensor
Output tensor of shape (batch_size, in_size).
Example:
--------
If the input tensor w has shape (32, 10, 5), the output will have shape (32, 5).
The LSTM processes the input sequence and returns the last output of the sequence.
The output is then passed through a ReLU activation function and a dropout layer.
Finally, it is passed through a fully connected layer to produce the final output.
The output is the reconstructed input sequence.
"""
z, (h_n, c_n) = self.lstm(w)
forecast = z[:, -1, :]
forecast = self.relu(forecast)
forecast = self.dropout(forecast)
output = self.fc(forecast)
return output
def testing(model, test_loader, device):
results=[]
forecast = []
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch = X_batch.to(device)
y_batch = y_batch.to(device)
w=model(X_batch)
results.append(torch.mean((y_batch.unsqueeze(1)-w)**2, axis=1))
forecast.append(w)
return results, forecast |