ClinicalMosaic / bert_classification.py
Sifal's picture
add classification script
5515d23 verified
raw
history blame
5.06 kB
import logging
from typing import Optional, Tuple, Union
import torch
import torch.nn as nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
from torch.nn.modules.utils import consume_prefix_in_state_dict_if_present
from transformers import BertPreTrainedModel
from transformers.modeling_outputs import SequenceClassifierOutput
from bert_layers_mosa import BertModel
logger = logging.getLogger(__name__)
class MosaicBertForSequenceClassification(BertPreTrainedModel):
"""Bert Model transformer with a sequence classification/regression head.
This head is just a linear layer on top of the pooled output.
"""
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.config = config
self.bert = BertModel(config, add_pooling_layer=True)
classifier_dropout = (
config.classifier_dropout
if config.classifier_dropout is not None
else config.hidden_dropout_prob
)
self.dropout = nn.Dropout(classifier_dropout)
self.classifier = nn.Linear(config.hidden_size, config.num_labels)
# this resets the weights
self.post_init()
@classmethod
def from_pretrained(
cls, pretrained_checkpoint, state_dict=None, config=None, *inputs, **kwargs
):
"""Load from pre-trained."""
# this gets a fresh init model
model = cls(config, *inputs, **kwargs)
# thus we need to load the state_dict
state_dict = torch.load(pretrained_checkpoint)
# remove `model` prefix to avoid error
consume_prefix_in_state_dict_if_present(state_dict, prefix="model.")
missing_keys, unexpected_keys = model.load_state_dict(state_dict, strict=False)
if len(missing_keys) > 0:
logger.warning(
f"Found these missing keys in the checkpoint: {', '.join(missing_keys)}"
)
logger.warning(f"the number of which is equal to {len(missing_keys)}")
if len(unexpected_keys) > 0:
logger.warning(
f"Found these unexpected keys in the checkpoint: {', '.join(unexpected_keys)}",
)
logger.warning(f"the number of which is equal to {len(unexpected_keys)}")
return model
def forward(
self,
input_ids: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
token_type_ids: Optional[torch.Tensor] = None,
position_ids: Optional[torch.Tensor] = None,
head_mask: Optional[torch.Tensor] = None,
inputs_embeds: Optional[torch.Tensor] = None,
labels: Optional[torch.Tensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
return_dict = (
return_dict if return_dict is not None else self.config.use_return_dict
)
outputs = self.bert(
input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
pooled_output = outputs[1]
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
if self.config.problem_type is None:
if self.num_labels == 1:
self.config.problem_type = "regression"
elif self.num_labels > 1 and (
labels.dtype == torch.long or labels.dtype == torch.int
):
self.config.problem_type = "single_label_classification"
else:
self.config.problem_type = "multi_label_classification"
if self.config.problem_type == "regression":
loss_fct = MSELoss()
if self.num_labels == 1:
loss = loss_fct(logits.squeeze(), labels.squeeze())
else:
loss = loss_fct(logits, labels)
elif self.config.problem_type == "single_label_classification":
loss_fct = CrossEntropyLoss()
loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
elif self.config.problem_type == "multi_label_classification":
loss_fct = BCEWithLogitsLoss()
loss = loss_fct(logits, labels)
if not return_dict:
output = (logits,) + outputs[2:]
return ((loss,) + output) if loss is not None else output
return SequenceClassifierOutput(
loss=loss,
logits=logits,
hidden_states=None,
attentions=None,
)