johnpaulbin's picture
yep
22d4f29
# -*- coding: utf-8 -*-
""" Model definition functions and weight loading.
"""
from __future__ import print_function, division, unicode_literals
from os.path import exists
import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence, PackedSequence
from torchmoji.lstm import LSTMHardSigmoid
from torchmoji.attlayer import Attention
from torchmoji.global_variables import NB_TOKENS, NB_EMOJI_CLASSES
def torchmoji_feature_encoding(weight_path, return_attention=False):
""" Loads the pretrained torchMoji model for extracting features
from the penultimate feature layer. In this way, it transforms
the text into its emotional encoding.
# Arguments:
weight_path: Path to model weights to be loaded.
return_attention: If true, output will include weight of each input token
used for the prediction
# Returns:
Pretrained model for encoding text into feature vectors.
"""
model = TorchMoji(nb_classes=None,
nb_tokens=NB_TOKENS,
feature_output=True,
return_attention=return_attention)
load_specific_weights(model, weight_path, exclude_names=['output_layer'])
return model
def torchmoji_emojis(weight_path, return_attention=False):
""" Loads the pretrained torchMoji model for extracting features
from the penultimate feature layer. In this way, it transforms
the text into its emotional encoding.
# Arguments:
weight_path: Path to model weights to be loaded.
return_attention: If true, output will include weight of each input token
used for the prediction
# Returns:
Pretrained model for encoding text into feature vectors.
"""
model = TorchMoji(nb_classes=NB_EMOJI_CLASSES,
nb_tokens=NB_TOKENS,
return_attention=return_attention)
model.load_state_dict(torch.load(weight_path))
return model
def torchmoji_transfer(nb_classes, weight_path=None, extend_embedding=0,
embed_dropout_rate=0.1, final_dropout_rate=0.5):
""" Loads the pretrained torchMoji model for finetuning/transfer learning.
Does not load weights for the softmax layer.
Note that if you are planning to use class average F1 for evaluation,
nb_classes should be set to 2 instead of the actual number of classes
in the dataset, since binary classification will be performed on each
class individually.
Note that for the 'new' method, weight_path should be left as None.
# Arguments:
nb_classes: Number of classes in the dataset.
weight_path: Path to model weights to be loaded.
extend_embedding: Number of tokens that have been added to the
vocabulary on top of NB_TOKENS. If this number is larger than 0,
the embedding layer's dimensions are adjusted accordingly, with the
additional weights being set to random values.
embed_dropout_rate: Dropout rate for the embedding layer.
final_dropout_rate: Dropout rate for the final Softmax layer.
# Returns:
Model with the given parameters.
"""
model = TorchMoji(nb_classes=nb_classes,
nb_tokens=NB_TOKENS + extend_embedding,
embed_dropout_rate=embed_dropout_rate,
final_dropout_rate=final_dropout_rate,
output_logits=True)
if weight_path is not None:
load_specific_weights(model, weight_path,
exclude_names=['output_layer'],
extend_embedding=extend_embedding)
return model
class TorchMoji(nn.Module):
def __init__(self, nb_classes, nb_tokens, feature_output=False, output_logits=False,
embed_dropout_rate=0, final_dropout_rate=0, return_attention=False):
"""
torchMoji model.
IMPORTANT: The model is loaded in evaluation mode by default (self.eval())
# Arguments:
nb_classes: Number of classes in the dataset.
nb_tokens: Number of tokens in the dataset (i.e. vocabulary size).
feature_output: If True the model returns the penultimate
feature vector rather than Softmax probabilities
(defaults to False).
output_logits: If True the model returns logits rather than probabilities
(defaults to False).
embed_dropout_rate: Dropout rate for the embedding layer.
final_dropout_rate: Dropout rate for the final Softmax layer.
return_attention: If True the model also returns attention weights over the sentence
(defaults to False).
"""
super(TorchMoji, self).__init__()
embedding_dim = 256
hidden_size = 512
attention_size = 4 * hidden_size + embedding_dim
self.feature_output = feature_output
self.embed_dropout_rate = embed_dropout_rate
self.final_dropout_rate = final_dropout_rate
self.return_attention = return_attention
self.hidden_size = hidden_size
self.output_logits = output_logits
self.nb_classes = nb_classes
self.add_module('embed', nn.Embedding(nb_tokens, embedding_dim))
# dropout2D: embedding channels are dropped out instead of words
# many exampels in the datasets contain few words that losing one or more words can alter the emotions completely
self.add_module('embed_dropout', nn.Dropout2d(embed_dropout_rate))
self.add_module('lstm_0', LSTMHardSigmoid(embedding_dim, hidden_size, batch_first=True, bidirectional=True))
self.add_module('lstm_1', LSTMHardSigmoid(hidden_size*2, hidden_size, batch_first=True, bidirectional=True))
self.add_module('attention_layer', Attention(attention_size=attention_size, return_attention=return_attention))
if not feature_output:
self.add_module('final_dropout', nn.Dropout(final_dropout_rate))
if output_logits:
self.add_module('output_layer', nn.Sequential(nn.Linear(attention_size, nb_classes if self.nb_classes > 2 else 1)))
else:
self.add_module('output_layer', nn.Sequential(nn.Linear(attention_size, nb_classes if self.nb_classes > 2 else 1),
nn.Softmax() if self.nb_classes > 2 else nn.Sigmoid()))
self.init_weights()
# Put model in evaluation mode by default
self.eval()
def init_weights(self):
"""
Here we reproduce Keras default initialization weights for consistency with Keras version
"""
ih = (param.data for name, param in self.named_parameters() if 'weight_ih' in name)
hh = (param.data for name, param in self.named_parameters() if 'weight_hh' in name)
b = (param.data for name, param in self.named_parameters() if 'bias' in name)
nn.init.uniform(self.embed.weight.data, a=-0.5, b=0.5)
for t in ih:
nn.init.xavier_uniform(t)
for t in hh:
nn.init.orthogonal(t)
for t in b:
nn.init.constant(t, 0)
if not self.feature_output:
nn.init.xavier_uniform(self.output_layer[0].weight.data)
def forward(self, input_seqs):
""" Forward pass.
# Arguments:
input_seqs: Can be one of Numpy array, Torch.LongTensor, Torch.Variable, Torch.PackedSequence.
# Return:
Same format as input format (except for PackedSequence returned as Variable).
"""
# Check if we have Torch.LongTensor inputs or not Torch.Variable (assume Numpy array in this case), take note to return same format
return_numpy = False
return_tensor = False
if isinstance(input_seqs, (torch.LongTensor, torch.cuda.LongTensor)):
input_seqs = Variable(input_seqs)
return_tensor = True
elif not isinstance(input_seqs, Variable):
input_seqs = Variable(torch.from_numpy(input_seqs.astype('int64')).long())
return_numpy = True
# If we don't have a packed inputs, let's pack it
reorder_output = False
if not isinstance(input_seqs, PackedSequence):
ho = self.lstm_0.weight_hh_l0.data.new(2, input_seqs.size()[0], self.hidden_size).zero_()
co = self.lstm_0.weight_hh_l0.data.new(2, input_seqs.size()[0], self.hidden_size).zero_()
# Reorder batch by sequence length
input_lengths = torch.LongTensor([torch.max(input_seqs[i, :].data.nonzero()) + 1 for i in range(input_seqs.size()[0])])
input_lengths, perm_idx = input_lengths.sort(0, descending=True)
input_seqs = input_seqs[perm_idx][:, :input_lengths.max()]
# Pack sequence and work on data tensor to reduce embeddings/dropout computations
packed_input = pack_padded_sequence(input_seqs, input_lengths.cpu().numpy(), batch_first=True)
reorder_output = True
else:
ho = self.lstm_0.weight_hh_l0.data.data.new(2, input_seqs.size()[0], self.hidden_size).zero_()
co = self.lstm_0.weight_hh_l0.data.data.new(2, input_seqs.size()[0], self.hidden_size).zero_()
input_lengths = input_seqs.batch_sizes
packed_input = input_seqs
hidden = (Variable(ho, requires_grad=False), Variable(co, requires_grad=False))
# Embed with an activation function to bound the values of the embeddings
x = self.embed(packed_input.data)
x = nn.Tanh()(x)
# pyTorch 2D dropout2d operate on axis 1 which is fine for us
x = self.embed_dropout(x)
# Update packed sequence data for RNN
packed_input = PackedSequence(x, packed_input.batch_sizes)
# skip-connection from embedding to output eases gradient-flow and allows access to lower-level features
# ordering of the way the merge is done is important for consistency with the pretrained model
lstm_0_output, _ = self.lstm_0(packed_input, hidden)
lstm_1_output, _ = self.lstm_1(lstm_0_output, hidden)
# Update packed sequence data for attention layer
packed_input = PackedSequence(torch.cat((lstm_1_output.data,
lstm_0_output.data,
packed_input.data), dim=1),
packed_input.batch_sizes)
input_seqs, _ = pad_packed_sequence(packed_input, batch_first=True)
x, att_weights = self.attention_layer(input_seqs, input_lengths)
# output class probabilities or penultimate feature vector
if not self.feature_output:
x = self.final_dropout(x)
outputs = self.output_layer(x)
else:
outputs = x
# Reorder output if needed
if reorder_output:
reorered = Variable(outputs.data.new(outputs.size()))
reorered[perm_idx] = outputs
outputs = reorered
# Adapt return format if needed
if return_tensor:
outputs = outputs.data
if return_numpy:
outputs = outputs.data.numpy()
if self.return_attention:
return outputs, att_weights
else:
return outputs
def load_specific_weights(model, weight_path, exclude_names=[], extend_embedding=0, verbose=True):
""" Loads model weights from the given file path, excluding any
given layers.
# Arguments:
model: Model whose weights should be loaded.
weight_path: Path to file containing model weights.
exclude_names: List of layer names whose weights should not be loaded.
extend_embedding: Number of new words being added to vocabulary.
verbose: Verbosity flag.
# Raises:
ValueError if the file at weight_path does not exist.
"""
if not exists(weight_path):
raise ValueError('ERROR (load_weights): The weights file at {} does '
'not exist. Refer to the README for instructions.'
.format(weight_path))
if extend_embedding and 'embed' in exclude_names:
raise ValueError('ERROR (load_weights): Cannot extend a vocabulary '
'without loading the embedding weights.')
# Copy only weights from the temporary model that are wanted
# for the specific task (e.g. the Softmax is often ignored)
weights = torch.load(weight_path)
for key, weight in weights.items():
if any(excluded in key for excluded in exclude_names):
if verbose:
print('Ignoring weights for {}'.format(key))
continue
try:
model_w = model.state_dict()[key]
except KeyError:
raise KeyError("Weights had parameters {},".format(key)
+ " but could not find this parameters in model.")
if verbose:
print('Loading weights for {}'.format(key))
# extend embedding layer to allow new randomly initialized words
# if requested. Otherwise, just load the weights for the layer.
if 'embed' in key and extend_embedding > 0:
weight = torch.cat((weight, model_w[NB_TOKENS:, :]), dim=0)
if verbose:
print('Extended vocabulary for embedding layer ' +
'from {} to {} tokens.'.format(
NB_TOKENS, NB_TOKENS + extend_embedding))
try:
model_w.copy_(weight)
except:
print('While copying the weigths named {}, whose dimensions in the model are'
' {} and whose dimensions in the saved file are {}, ...'.format(
key, model_w.size(), weight.size()))
raise