# -*- coding: utf-8 -*- """ Model definition functions and weight loading. """ from __future__ import print_function, division, unicode_literals from os.path import exists import torch import torch.nn as nn from torch.autograd import Variable from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence, PackedSequence from torchmoji.lstm import LSTMHardSigmoid from torchmoji.attlayer import Attention from torchmoji.global_variables import NB_TOKENS, NB_EMOJI_CLASSES def torchmoji_feature_encoding(weight_path, return_attention=False): """ Loads the pretrained torchMoji model for extracting features from the penultimate feature layer. In this way, it transforms the text into its emotional encoding. # Arguments: weight_path: Path to model weights to be loaded. return_attention: If true, output will include weight of each input token used for the prediction # Returns: Pretrained model for encoding text into feature vectors. """ model = TorchMoji(nb_classes=None, nb_tokens=NB_TOKENS, feature_output=True, return_attention=return_attention) load_specific_weights(model, weight_path, exclude_names=['output_layer']) return model def torchmoji_emojis(weight_path, return_attention=False): """ Loads the pretrained torchMoji model for extracting features from the penultimate feature layer. In this way, it transforms the text into its emotional encoding. # Arguments: weight_path: Path to model weights to be loaded. return_attention: If true, output will include weight of each input token used for the prediction # Returns: Pretrained model for encoding text into feature vectors. """ model = TorchMoji(nb_classes=NB_EMOJI_CLASSES, nb_tokens=NB_TOKENS, return_attention=return_attention) model.load_state_dict(torch.load(weight_path)) return model def torchmoji_transfer(nb_classes, weight_path=None, extend_embedding=0, embed_dropout_rate=0.1, final_dropout_rate=0.5): """ Loads the pretrained torchMoji model for finetuning/transfer learning. Does not load weights for the softmax layer. Note that if you are planning to use class average F1 for evaluation, nb_classes should be set to 2 instead of the actual number of classes in the dataset, since binary classification will be performed on each class individually. Note that for the 'new' method, weight_path should be left as None. # Arguments: nb_classes: Number of classes in the dataset. weight_path: Path to model weights to be loaded. extend_embedding: Number of tokens that have been added to the vocabulary on top of NB_TOKENS. If this number is larger than 0, the embedding layer's dimensions are adjusted accordingly, with the additional weights being set to random values. embed_dropout_rate: Dropout rate for the embedding layer. final_dropout_rate: Dropout rate for the final Softmax layer. # Returns: Model with the given parameters. """ model = TorchMoji(nb_classes=nb_classes, nb_tokens=NB_TOKENS + extend_embedding, embed_dropout_rate=embed_dropout_rate, final_dropout_rate=final_dropout_rate, output_logits=True) if weight_path is not None: load_specific_weights(model, weight_path, exclude_names=['output_layer'], extend_embedding=extend_embedding) return model class TorchMoji(nn.Module): def __init__(self, nb_classes, nb_tokens, feature_output=False, output_logits=False, embed_dropout_rate=0, final_dropout_rate=0, return_attention=False): """ torchMoji model. IMPORTANT: The model is loaded in evaluation mode by default (self.eval()) # Arguments: nb_classes: Number of classes in the dataset. nb_tokens: Number of tokens in the dataset (i.e. vocabulary size). feature_output: If True the model returns the penultimate feature vector rather than Softmax probabilities (defaults to False). output_logits: If True the model returns logits rather than probabilities (defaults to False). embed_dropout_rate: Dropout rate for the embedding layer. final_dropout_rate: Dropout rate for the final Softmax layer. return_attention: If True the model also returns attention weights over the sentence (defaults to False). """ super(TorchMoji, self).__init__() embedding_dim = 256 hidden_size = 512 attention_size = 4 * hidden_size + embedding_dim self.feature_output = feature_output self.embed_dropout_rate = embed_dropout_rate self.final_dropout_rate = final_dropout_rate self.return_attention = return_attention self.hidden_size = hidden_size self.output_logits = output_logits self.nb_classes = nb_classes self.add_module('embed', nn.Embedding(nb_tokens, embedding_dim)) # dropout2D: embedding channels are dropped out instead of words # many exampels in the datasets contain few words that losing one or more words can alter the emotions completely self.add_module('embed_dropout', nn.Dropout2d(embed_dropout_rate)) self.add_module('lstm_0', LSTMHardSigmoid(embedding_dim, hidden_size, batch_first=True, bidirectional=True)) self.add_module('lstm_1', LSTMHardSigmoid(hidden_size*2, hidden_size, batch_first=True, bidirectional=True)) self.add_module('attention_layer', Attention(attention_size=attention_size, return_attention=return_attention)) if not feature_output: self.add_module('final_dropout', nn.Dropout(final_dropout_rate)) if output_logits: self.add_module('output_layer', nn.Sequential(nn.Linear(attention_size, nb_classes if self.nb_classes > 2 else 1))) else: self.add_module('output_layer', nn.Sequential(nn.Linear(attention_size, nb_classes if self.nb_classes > 2 else 1), nn.Softmax() if self.nb_classes > 2 else nn.Sigmoid())) self.init_weights() # Put model in evaluation mode by default self.eval() def init_weights(self): """ Here we reproduce Keras default initialization weights for consistency with Keras version """ ih = (param.data for name, param in self.named_parameters() if 'weight_ih' in name) hh = (param.data for name, param in self.named_parameters() if 'weight_hh' in name) b = (param.data for name, param in self.named_parameters() if 'bias' in name) nn.init.uniform(self.embed.weight.data, a=-0.5, b=0.5) for t in ih: nn.init.xavier_uniform(t) for t in hh: nn.init.orthogonal(t) for t in b: nn.init.constant(t, 0) if not self.feature_output: nn.init.xavier_uniform(self.output_layer[0].weight.data) def forward(self, input_seqs): """ Forward pass. # Arguments: input_seqs: Can be one of Numpy array, Torch.LongTensor, Torch.Variable, Torch.PackedSequence. # Return: Same format as input format (except for PackedSequence returned as Variable). """ # Check if we have Torch.LongTensor inputs or not Torch.Variable (assume Numpy array in this case), take note to return same format return_numpy = False return_tensor = False if isinstance(input_seqs, (torch.LongTensor, torch.cuda.LongTensor)): input_seqs = Variable(input_seqs) return_tensor = True elif not isinstance(input_seqs, Variable): input_seqs = Variable(torch.from_numpy(input_seqs.astype('int64')).long()) return_numpy = True # If we don't have a packed inputs, let's pack it reorder_output = False if not isinstance(input_seqs, PackedSequence): ho = self.lstm_0.weight_hh_l0.data.new(2, input_seqs.size()[0], self.hidden_size).zero_() co = self.lstm_0.weight_hh_l0.data.new(2, input_seqs.size()[0], self.hidden_size).zero_() # Reorder batch by sequence length input_lengths = torch.LongTensor([torch.max(input_seqs[i, :].data.nonzero()) + 1 for i in range(input_seqs.size()[0])]) input_lengths, perm_idx = input_lengths.sort(0, descending=True) input_seqs = input_seqs[perm_idx][:, :input_lengths.max()] # Pack sequence and work on data tensor to reduce embeddings/dropout computations packed_input = pack_padded_sequence(input_seqs, input_lengths.cpu().numpy(), batch_first=True) reorder_output = True else: ho = self.lstm_0.weight_hh_l0.data.data.new(2, input_seqs.size()[0], self.hidden_size).zero_() co = self.lstm_0.weight_hh_l0.data.data.new(2, input_seqs.size()[0], self.hidden_size).zero_() input_lengths = input_seqs.batch_sizes packed_input = input_seqs hidden = (Variable(ho, requires_grad=False), Variable(co, requires_grad=False)) # Embed with an activation function to bound the values of the embeddings x = self.embed(packed_input.data) x = nn.Tanh()(x) # pyTorch 2D dropout2d operate on axis 1 which is fine for us x = self.embed_dropout(x) # Update packed sequence data for RNN packed_input = PackedSequence(x, packed_input.batch_sizes) # skip-connection from embedding to output eases gradient-flow and allows access to lower-level features # ordering of the way the merge is done is important for consistency with the pretrained model lstm_0_output, _ = self.lstm_0(packed_input, hidden) lstm_1_output, _ = self.lstm_1(lstm_0_output, hidden) # Update packed sequence data for attention layer packed_input = PackedSequence(torch.cat((lstm_1_output.data, lstm_0_output.data, packed_input.data), dim=1), packed_input.batch_sizes) input_seqs, _ = pad_packed_sequence(packed_input, batch_first=True) x, att_weights = self.attention_layer(input_seqs, input_lengths) # output class probabilities or penultimate feature vector if not self.feature_output: x = self.final_dropout(x) outputs = self.output_layer(x) else: outputs = x # Reorder output if needed if reorder_output: reorered = Variable(outputs.data.new(outputs.size())) reorered[perm_idx] = outputs outputs = reorered # Adapt return format if needed if return_tensor: outputs = outputs.data if return_numpy: outputs = outputs.data.numpy() if self.return_attention: return outputs, att_weights else: return outputs def load_specific_weights(model, weight_path, exclude_names=[], extend_embedding=0, verbose=True): """ Loads model weights from the given file path, excluding any given layers. # Arguments: model: Model whose weights should be loaded. weight_path: Path to file containing model weights. exclude_names: List of layer names whose weights should not be loaded. extend_embedding: Number of new words being added to vocabulary. verbose: Verbosity flag. # Raises: ValueError if the file at weight_path does not exist. """ if not exists(weight_path): raise ValueError('ERROR (load_weights): The weights file at {} does ' 'not exist. Refer to the README for instructions.' .format(weight_path)) if extend_embedding and 'embed' in exclude_names: raise ValueError('ERROR (load_weights): Cannot extend a vocabulary ' 'without loading the embedding weights.') # Copy only weights from the temporary model that are wanted # for the specific task (e.g. the Softmax is often ignored) weights = torch.load(weight_path) for key, weight in weights.items(): if any(excluded in key for excluded in exclude_names): if verbose: print('Ignoring weights for {}'.format(key)) continue try: model_w = model.state_dict()[key] except KeyError: raise KeyError("Weights had parameters {},".format(key) + " but could not find this parameters in model.") if verbose: print('Loading weights for {}'.format(key)) # extend embedding layer to allow new randomly initialized words # if requested. Otherwise, just load the weights for the layer. if 'embed' in key and extend_embedding > 0: weight = torch.cat((weight, model_w[NB_TOKENS:, :]), dim=0) if verbose: print('Extended vocabulary for embedding layer ' + 'from {} to {} tokens.'.format( NB_TOKENS, NB_TOKENS + extend_embedding)) try: model_w.copy_(weight) except: print('While copying the weigths named {}, whose dimensions in the model are' ' {} and whose dimensions in the saved file are {}, ...'.format( key, model_w.size(), weight.size())) raise