# -*- coding: utf-8 -*- """ Implement a pyTorch LSTM with hard sigmoid reccurent activation functions. Adapted from the non-cuda variant of pyTorch LSTM at https://github.com/pytorch/pytorch/blob/master/torch/nn/_functions/rnn.py """ from __future__ import print_function, division import math import torch from torch.nn import Module from torch.nn.parameter import Parameter from torch.nn.utils.rnn import PackedSequence import torch.nn.functional as F class LSTMHardSigmoid(Module): def __init__(self, input_size, hidden_size, num_layers=1, bias=True, batch_first=False, dropout=0, bidirectional=False): super(LSTMHardSigmoid, self).__init__() self.input_size = input_size self.hidden_size = hidden_size self.num_layers = num_layers self.bias = bias self.batch_first = batch_first self.dropout = dropout self.dropout_state = {} self.bidirectional = bidirectional num_directions = 2 if bidirectional else 1 gate_size = 4 * hidden_size self._all_weights = [] for layer in range(num_layers): for direction in range(num_directions): layer_input_size = input_size if layer == 0 else hidden_size * num_directions w_ih = Parameter(torch.Tensor(gate_size, layer_input_size)) w_hh = Parameter(torch.Tensor(gate_size, hidden_size)) b_ih = Parameter(torch.Tensor(gate_size)) b_hh = Parameter(torch.Tensor(gate_size)) layer_params = (w_ih, w_hh, b_ih, b_hh) suffix = '_reverse' if direction == 1 else '' param_names = ['weight_ih_l{}{}', 'weight_hh_l{}{}'] if bias: param_names += ['bias_ih_l{}{}', 'bias_hh_l{}{}'] param_names = [x.format(layer, suffix) for x in param_names] for name, param in zip(param_names, layer_params): setattr(self, name, param) self._all_weights.append(param_names) self.flatten_parameters() self.reset_parameters() def flatten_parameters(self): """Resets parameter data pointer so that they can use faster code paths. Right now, this is a no-op wince we don't use CUDA acceleration. """ self._data_ptrs = [] def _apply(self, fn): ret = super(LSTMHardSigmoid, self)._apply(fn) self.flatten_parameters() return ret def reset_parameters(self): stdv = 1.0 / math.sqrt(self.hidden_size) for weight in self.parameters(): weight.data.uniform_(-stdv, stdv) def forward(self, input, hx=None): is_packed = isinstance(input, PackedSequence) if is_packed: input, batch_sizes ,_ ,_ = input max_batch_size = batch_sizes[0] else: batch_sizes = None max_batch_size = input.size(0) if self.batch_first else input.size(1) if hx is None: num_directions = 2 if self.bidirectional else 1 hx = torch.autograd.Variable(input.data.new(self.num_layers * num_directions, max_batch_size, self.hidden_size).zero_(), requires_grad=False) hx = (hx, hx) has_flat_weights = list(p.data.data_ptr() for p in self.parameters()) == self._data_ptrs if has_flat_weights: first_data = next(self.parameters()).data assert first_data.storage().size() == self._param_buf_size flat_weight = first_data.new().set_(first_data.storage(), 0, torch.Size([self._param_buf_size])) else: flat_weight = None func = AutogradRNN( self.input_size, self.hidden_size, num_layers=self.num_layers, batch_first=self.batch_first, dropout=self.dropout, train=self.training, bidirectional=self.bidirectional, batch_sizes=batch_sizes, dropout_state=self.dropout_state, flat_weight=flat_weight ) output, hidden = func(input, self.all_weights, hx) if is_packed: output = PackedSequence(output, batch_sizes) return output, hidden def __repr__(self): s = '{name}({input_size}, {hidden_size}' if self.num_layers != 1: s += ', num_layers={num_layers}' if self.bias is not True: s += ', bias={bias}' if self.batch_first is not False: s += ', batch_first={batch_first}' if self.dropout != 0: s += ', dropout={dropout}' if self.bidirectional is not False: s += ', bidirectional={bidirectional}' s += ')' return s.format(name=self.__class__.__name__, **self.__dict__) def __setstate__(self, d): super(LSTMHardSigmoid, self).__setstate__(d) self.__dict__.setdefault('_data_ptrs', []) if 'all_weights' in d: self._all_weights = d['all_weights'] if isinstance(self._all_weights[0][0], str): return num_layers = self.num_layers num_directions = 2 if self.bidirectional else 1 self._all_weights = [] for layer in range(num_layers): for direction in range(num_directions): suffix = '_reverse' if direction == 1 else '' weights = ['weight_ih_l{}{}', 'weight_hh_l{}{}', 'bias_ih_l{}{}', 'bias_hh_l{}{}'] weights = [x.format(layer, suffix) for x in weights] if self.bias: self._all_weights += [weights] else: self._all_weights += [weights[:2]] @property def all_weights(self): return [[getattr(self, weight) for weight in weights] for weights in self._all_weights] def AutogradRNN(input_size, hidden_size, num_layers=1, batch_first=False, dropout=0, train=True, bidirectional=False, batch_sizes=None, dropout_state=None, flat_weight=None): cell = LSTMCell if batch_sizes is None: rec_factory = Recurrent else: rec_factory = variable_recurrent_factory(batch_sizes) if bidirectional: layer = (rec_factory(cell), rec_factory(cell, reverse=True)) else: layer = (rec_factory(cell),) func = StackedRNN(layer, num_layers, True, dropout=dropout, train=train) def forward(input, weight, hidden): if batch_first and batch_sizes is None: input = input.transpose(0, 1) nexth, output = func(input, hidden, weight) if batch_first and batch_sizes is None: output = output.transpose(0, 1) return output, nexth return forward def Recurrent(inner, reverse=False): def forward(input, hidden, weight): output = [] steps = range(input.size(0) - 1, -1, -1) if reverse else range(input.size(0)) for i in steps: hidden = inner(input[i], hidden, *weight) # hack to handle LSTM output.append(hidden[0] if isinstance(hidden, tuple) else hidden) if reverse: output.reverse() output = torch.cat(output, 0).view(input.size(0), *output[0].size()) return hidden, output return forward def variable_recurrent_factory(batch_sizes): def fac(inner, reverse=False): if reverse: return VariableRecurrentReverse(batch_sizes, inner) else: return VariableRecurrent(batch_sizes, inner) return fac def VariableRecurrent(batch_sizes, inner): def forward(input, hidden, weight): output = [] input_offset = 0 last_batch_size = batch_sizes[0] hiddens = [] flat_hidden = not isinstance(hidden, tuple) if flat_hidden: hidden = (hidden,) for batch_size in batch_sizes: step_input = input[input_offset:input_offset + batch_size] input_offset += batch_size dec = last_batch_size - batch_size if dec > 0: hiddens.append(tuple(h[-dec:] for h in hidden)) hidden = tuple(h[:-dec] for h in hidden) last_batch_size = batch_size if flat_hidden: hidden = (inner(step_input, hidden[0], *weight),) else: hidden = inner(step_input, hidden, *weight) output.append(hidden[0]) hiddens.append(hidden) hiddens.reverse() hidden = tuple(torch.cat(h, 0) for h in zip(*hiddens)) assert hidden[0].size(0) == batch_sizes[0] if flat_hidden: hidden = hidden[0] output = torch.cat(output, 0) return hidden, output return forward def VariableRecurrentReverse(batch_sizes, inner): def forward(input, hidden, weight): output = [] input_offset = input.size(0) last_batch_size = batch_sizes[-1] initial_hidden = hidden flat_hidden = not isinstance(hidden, tuple) if flat_hidden: hidden = (hidden,) initial_hidden = (initial_hidden,) hidden = tuple(h[:batch_sizes[-1]] for h in hidden) for batch_size in reversed(batch_sizes): inc = batch_size - last_batch_size if inc > 0: hidden = tuple(torch.cat((h, ih[last_batch_size:batch_size]), 0) for h, ih in zip(hidden, initial_hidden)) last_batch_size = batch_size step_input = input[input_offset - batch_size:input_offset] input_offset -= batch_size if flat_hidden: hidden = (inner(step_input, hidden[0], *weight),) else: hidden = inner(step_input, hidden, *weight) output.append(hidden[0]) output.reverse() output = torch.cat(output, 0) if flat_hidden: hidden = hidden[0] return hidden, output return forward def StackedRNN(inners, num_layers, lstm=False, dropout=0, train=True): num_directions = len(inners) total_layers = num_layers * num_directions def forward(input, hidden, weight): assert(len(weight) == total_layers) next_hidden = [] if lstm: hidden = list(zip(*hidden)) for i in range(num_layers): all_output = [] for j, inner in enumerate(inners): l = i * num_directions + j hy, output = inner(input, hidden[l], weight[l]) next_hidden.append(hy) all_output.append(output) input = torch.cat(all_output, input.dim() - 1) if dropout != 0 and i < num_layers - 1: input = F.dropout(input, p=dropout, training=train, inplace=False) if lstm: next_h, next_c = zip(*next_hidden) next_hidden = ( torch.cat(next_h, 0).view(total_layers, *next_h[0].size()), torch.cat(next_c, 0).view(total_layers, *next_c[0].size()) ) else: next_hidden = torch.cat(next_hidden, 0).view( total_layers, *next_hidden[0].size()) return next_hidden, input return forward def LSTMCell(input, hidden, w_ih, w_hh, b_ih=None, b_hh=None): """ A modified LSTM cell with hard sigmoid activation on the input, forget and output gates. """ hx, cx = hidden gates = F.linear(input, w_ih, b_ih) + F.linear(hx, w_hh, b_hh) ingate, forgetgate, cellgate, outgate = gates.chunk(4, 1) ingate = hard_sigmoid(ingate) forgetgate = hard_sigmoid(forgetgate) cellgate = F.tanh(cellgate) outgate = hard_sigmoid(outgate) cy = (forgetgate * cx) + (ingate * cellgate) hy = outgate * F.tanh(cy) return hy, cy def hard_sigmoid(x): """ Computes element-wise hard sigmoid of x. See e.g. https://github.com/Theano/Theano/blob/master/theano/tensor/nnet/sigm.py#L279 """ x = (0.2 * x) + 0.5 x = F.threshold(-x, -1, -1) x = F.threshold(-x, 0, 0) return x