import numpy as np import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torch.autograd import Variable import torch.distributed as dist import math class LSTMCell(nn.Module): def __init__(self, input_size, hidden_size, bias=True): super(LSTMCell, self).__init__() self.input_size = input_size self.hidden_size = hidden_size self.bias = bias self.x2h = nn.Linear(input_size, 4 * hidden_size, bias=bias) self.h2h = nn.Linear(hidden_size, 4 * hidden_size, bias=bias) self.reset_parameters() def reset_parameters(self): std = 1.0 / math.sqrt(self.hidden_size) for w in self.parameters(): w.data.uniform_(-std, std) def forward(self, x, hidden): hx, cx = hidden x = x.view(-1, x.size(1)) gates = self.x2h(x) + self.h2h(hx) # print(f"gates: {gates.shape}") # gates = gates.squeeze() ingate, forgetgate, cellgate, outgate = gates.chunk(4, 1) ingate = F.sigmoid(ingate) forgetgate = F.sigmoid(forgetgate) cellgate = F.tanh(cellgate) outgate = F.sigmoid(outgate) cy = torch.mul(cx, forgetgate) + torch.mul(ingate, cellgate) hy = torch.mul(outgate, F.tanh(cy)) return (hy, cy) class LSTMModel(nn.Module): def __init__(self, input_dim, hidden_dim, layer_dim, output_dim, bias=True): super(LSTMModel, self).__init__() # Hidden dimensions self.hidden_dim = hidden_dim # Number of hidden layers self.layer_dim = layer_dim self.lstm = LSTMCell(input_dim, hidden_dim, layer_dim) self.fc = nn.Linear(hidden_dim, output_dim) def forward(self, x): # Initialize hidden state with zeros ####################### # USE GPU FOR MODEL # ####################### #print(x.shape,"x.shape")100, 28, 28 if torch.cuda.is_available(): h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).cuda()) else: h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim)) # Initialize cell state if torch.cuda.is_available(): c0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim).cuda()) else: c0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim)) outs = [] cn = c0[0,:,:] hn = h0[0,:,:] for seq in range(x.size(1)): hn, cn = self.lstm(x[:,seq,:], (hn,cn)) outs.append(hn) out = outs[-1] # .squeeze() out = self.fc(out) # out.size() --> 100, 10 return out class LSTM_model(nn.Module): def __init__(self, vocab_size, n_hidden): super(LSTM_model, self).__init__() self.embedding = nn.Embedding(vocab_size, n_hidden) self.lstm = LSTMModel(n_hidden, n_hidden, n_hidden, n_hidden) self.fc_output = nn.Linear(n_hidden, 1) self.loss = nn.BCEWithLogitsLoss() def forward(self, X, t, train=True): embed = self.embedding(X) # batch_size, time_steps, features no_of_timesteps = embed.shape[1] n_hidden = embed.shape[2] input = embed # print(f"input: {input.shape}") fc_out = self.lstm(input) ## bsz x nnhidden_dim # print(f"fc_out: {fc_out.size()}") h = self.fc_output(fc_out) # print(f"h: {h.size()}") return self.loss(h[:, 0], t), h[:, 0] class BiLSTM(nn.Module): def __init__(self, input_size, hidden_size, bias=True): super(BiLSTM, self).__init__() self.forward_cell = LSTMCell(input_size, hidden_size, bias) self.backward_cell = LSTMCell(input_size, hidden_size, bias) def forward(self, input_seq): forward_outputs = [] backward_outputs = [] forward_hidden = (torch.zeros(input_seq.size(0), self.forward_cell.hidden_size).to(input_seq.device), torch.zeros(input_seq.size(0), self.forward_cell.hidden_size).to(input_seq.device)) backward_hidden = (torch.zeros(input_seq.size(0), self.backward_cell.hidden_size).to(input_seq.device), torch.zeros(input_seq.size(0), self.backward_cell.hidden_size).to(input_seq.device)) for t in range(input_seq.size(1)): forward_hidden = self.forward_cell(input_seq[:, t], forward_hidden) forward_outputs.append(forward_hidden[0]) for t in range(input_seq.size(1)-1, -1, -1): backward_hidden = self.backward_cell(input_seq[:, t], backward_hidden) backward_outputs.append(backward_hidden[0]) forward_outputs = torch.stack(forward_outputs, dim=1) backward_outputs = torch.stack(backward_outputs, dim=1) outputs = torch.cat((forward_outputs, backward_outputs), dim=2) return outputs class BiLSTMModel(nn.Module): def __init__(self, vocab_size, n_hidden): super(BiLSTMModel, self).__init__() self.embedding = nn.Embedding(vocab_size, n_hidden) self.bilstm = BiLSTM(n_hidden, n_hidden) self.fc_output = nn.Linear(2*n_hidden, 1) self.loss = nn.BCEWithLogitsLoss() def forward(self, X, t, train=True): embed = self.embedding(X) # batch_size, time_steps, features no_of_timesteps = embed.shape[1] n_hidden = embed.shape[2] input = embed bilstm_out = self.bilstm(input) ## bsz x nnhidden_dim bilstm_out = bilstm_out[:, -1, :] h = self.fc_output(bilstm_out) # print(f"bilstm_out: {bilstm_out.shape}, h: {h.shape}, t: {t.shape}") return self.loss(h[:,0], t), h[:, 0]