File size: 5,215 Bytes
4e98941 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
"""
Implementation borrowed from transformers package and extended to support multiple prediction heads:
https://github.com/huggingface/transformers/blob/master/src/transformers/models/bert/modeling_bert.py
"""
import torch
from torch import nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
import transformers
from transformers import BertTokenizer
from transformers import models
from transformers.modeling_outputs import SequenceClassifierOutput
from transformers.models.bert.configuration_bert import BertConfig
from transformers.models.bert.modeling_bert import (
BertPreTrainedModel,
BERT_INPUTS_DOCSTRING,
_TOKENIZER_FOR_DOC,
_CHECKPOINT_FOR_DOC,
_CONFIG_FOR_DOC,
BertModel,
)
from transformers.file_utils import (
add_code_sample_docstrings,
add_start_docstrings_to_model_forward,
)
class BertForSequenceClassification(BertPreTrainedModel):
def __init__(self, config, **kwargs):
super().__init__(transformers.PretrainedConfig())
self.num_labels = kwargs.get("task_labels_map", {})
self.config = config
self.bert = BertModel(config)
classifier_dropout = (
config.classifier_dropout
if config.classifier_dropout is not None
else config.hidden_dropout_prob
)
self.dropout = nn.Dropout(classifier_dropout)
## add task specific output heads
self.classifier1 = nn.Linear(
config.hidden_size, list(self.num_labels.values())[0]
)
self.classifier2 = nn.Linear(
config.hidden_size, list(self.num_labels.values())[1]
)
self.init_weights()
@add_start_docstrings_to_model_forward(
BERT_INPUTS_DOCSTRING.format("batch_size, sequence_length")
)
@add_code_sample_docstrings(
tokenizer_class=_TOKENIZER_FOR_DOC,
checkpoint=_CHECKPOINT_FOR_DOC,
output_type=SequenceClassifierOutput,
config_class=_CONFIG_FOR_DOC,
)
def forward(
self,
input_ids=None,
attention_mask=None,
token_type_ids=None,
position_ids=None,
head_mask=None,
inputs_embeds=None,
labels=None,
output_attentions=None,
output_hidden_states=None,
return_dict=None,
task_name=None,
):
r"""
labels (:obj:`torch.LongTensor` of shape :obj:`(batch_size,)`, `optional`):
Labels for computing the sequence classification/regression loss. Indices should be in :obj:`[0, ...,
config.num_labels - 1]`. If :obj:`config.num_labels == 1` a regression loss is computed (Mean-Square loss),
If :obj:`config.num_labels > 1` a classification loss is computed (Cross-Entropy).
"""
return_dict = (
return_dict if return_dict is not None else self.config.use_return_dict
)
outputs = self.bert(
input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
pooled_output = outputs[1]
pooled_output = self.dropout(pooled_output)
logits = None
if task_name == list(self.num_labels.keys())[0]:
logits = self.classifier1(pooled_output)
elif task_name == list(self.num_labels.keys())[1]:
logits = self.classifier2(pooled_output)
loss = None
if labels is not None:
if self.config.problem_type is None:
if self.num_labels[task_name] == 1:
self.config.problem_type = "regression"
elif self.num_labels[task_name] > 1 and (
labels.dtype == torch.long or labels.dtype == torch.int
):
self.config.problem_type = "single_label_classification"
else:
self.config.problem_type = "multi_label_classification"
if self.config.problem_type == "regression":
loss_fct = MSELoss()
if self.num_labels[task_name] == 1:
loss = loss_fct(logits.squeeze(), labels.squeeze())
else:
loss = loss_fct(logits, labels)
elif self.config.problem_type == "single_label_classification":
loss_fct = CrossEntropyLoss()
loss = loss_fct(
logits.view(-1, self.num_labels[task_name]), labels.view(-1)
)
elif self.config.problem_type == "multi_label_classification":
loss_fct = BCEWithLogitsLoss()
loss = loss_fct(logits, labels)
if not return_dict:
output = (logits,) + outputs[2:]
return ((loss,) + output) if loss is not None else output
return SequenceClassifierOutput(
loss=loss,
logits=logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
|